You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by le...@apache.org on 2017/10/27 22:40:23 UTC
[46/51] [partial] incubator-sdap-nexus git commit: SDAP-1 Import all
code under the SDAP SGA
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms_spark/DailyDifferenceAverageSpark.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms_spark/DailyDifferenceAverageSpark.py b/analysis/webservice/algorithms_spark/DailyDifferenceAverageSpark.py
new file mode 100644
index 0000000..1f31267
--- /dev/null
+++ b/analysis/webservice/algorithms_spark/DailyDifferenceAverageSpark.py
@@ -0,0 +1,391 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology. All rights reserved
+"""
+import logging
+from datetime import datetime
+
+import numpy as np
+import pytz
+from nexustiles.nexustiles import NexusTileService
+from shapely import wkt
+from shapely.geometry import Polygon
+
+from webservice.NexusHandler import nexus_handler, SparkHandler
+from webservice.webmodel import NexusResults, NexusProcessingException
+
+SENTINEL = 'STOP'
+
+EPOCH = pytz.timezone('UTC').localize(datetime(1970, 1, 1))
+
+
+def iso_time_to_epoch(str_time):
+ return (datetime.strptime(str_time, "%Y-%m-%dT%H:%M:%SZ").replace(
+ tzinfo=pytz.UTC) - EPOCH).total_seconds()
+
+
+@nexus_handler
+class DailyDifferenceAverageSparkImpl(SparkHandler):
+ name = "Daily Difference Average Spark"
+ path = "/dailydifferenceaverage_spark"
+ description = "Subtracts data in box in Dataset 1 from Dataset 2, then averages the difference per day."
+ params = {
+ "dataset": {
+ "name": "Dataset",
+ "type": "string",
+ "description": "The Dataset shortname to use in calculation"
+ },
+ "climatology": {
+ "name": "Climatology",
+ "type": "string",
+ "description": "The Climatology shortname to use in calculation"
+ },
+ "b": {
+ "name": "Bounding box",
+ "type": "comma-delimited float",
+ "description": "Minimum (Western) Longitude, Minimum (Southern) Latitude, Maximum (Eastern) Longitude, Maximum (Northern) Latitude"
+ },
+ "startTime": {
+ "name": "Start Time",
+ "type": "string",
+ "description": "Starting time in format YYYY-MM-DDTHH:mm:ssZ"
+ },
+ "endTime": {
+ "name": "End Time",
+ "type": "string",
+ "description": "Ending time in format YYYY-MM-DDTHH:mm:ssZ"
+ }
+ }
+ singleton = True
+
+ def __init__(self):
+ SparkHandler.__init__(self, skipCassandra=True)
+ self.log = logging.getLogger(__name__)
+
+ def parse_arguments(self, request):
+ # Parse input arguments
+ self.log.debug("Parsing arguments")
+ try:
+ bounding_polygon = request.get_bounding_polygon()
+ except:
+ try:
+ minLat = request.get_min_lat()
+ maxLat = request.get_max_lat()
+ minLon = request.get_min_lon()
+ maxLon = request.get_max_lon()
+ bounding_polygon = Polygon([(minLon, minLat), # (west, south)
+ (maxLon, minLat), # (east, south)
+ (maxLon, maxLat), # (east, north)
+ (minLon, maxLat), # (west, north)
+ (minLon, minLat)]) # (west, south)
+ except:
+ raise NexusProcessingException(
+ reason="'b' argument or 'minLon', 'minLat', 'maxLon', and 'maxLat' arguments are required. If 'b' is used, it must be comma-delimited float formatted as Minimum (Western) Longitude, Minimum (Southern) Latitude, Maximum (Eastern) Longitude, Maximum (Northern) Latitude",
+ code=400)
+ dataset = request.get_argument('dataset', None)
+ if dataset is None:
+ dataset = request.get_argument('ds1', None)
+ if dataset is None:
+ raise NexusProcessingException(reason="'dataset' or 'ds1' argument is required", code=400)
+ climatology = request.get_argument('climatology', None)
+ if climatology is None:
+ climatology = request.get_argument('ds2', None)
+ if climatology is None:
+ raise NexusProcessingException(reason="'climatology' or 'ds2' argument is required", code=400)
+
+ try:
+ start_time = request.get_start_datetime()
+ except:
+ raise NexusProcessingException(
+ reason="'startTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ",
+ code=400)
+ try:
+ end_time = request.get_end_datetime()
+ except:
+ raise NexusProcessingException(
+ reason="'endTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ",
+ code=400)
+
+ start_seconds_from_epoch = long((start_time - EPOCH).total_seconds())
+ end_seconds_from_epoch = long((end_time - EPOCH).total_seconds())
+
+ plot = request.get_boolean_arg("plot", default=False)
+
+ return bounding_polygon, dataset, climatology, start_time, start_seconds_from_epoch, end_time, end_seconds_from_epoch, plot
+
+ def calc(self, request, **args):
+ bounding_polygon, dataset, climatology, start_time, start_seconds_from_epoch, end_time, end_seconds_from_epoch, plot = self.parse_arguments(
+ request)
+
+ self.log.debug("Querying for tiles in search domain")
+ # Get tile ids in box
+ tile_ids = [tile.tile_id for tile in
+ self._tile_service.find_tiles_in_polygon(bounding_polygon, dataset,
+ start_seconds_from_epoch, end_seconds_from_epoch,
+ fetch_data=False, fl='id',
+ sort=['tile_min_time_dt asc', 'tile_min_lon asc',
+ 'tile_min_lat asc'], rows=5000)]
+
+ # Call spark_matchup
+ self.log.debug("Calling Spark Driver")
+ try:
+ spark_result = spark_anomolies_driver(tile_ids, wkt.dumps(bounding_polygon), dataset, climatology,
+ sc=self._sc)
+ except Exception as e:
+ self.log.exception(e)
+ raise NexusProcessingException(reason="An unknown error occurred while computing average differences",
+ code=500)
+
+ average_and_std_by_day = spark_result
+
+ result = DDAResult(
+ results=[[{'time': dayms, 'mean': avg_std[0], 'std': avg_std[1], 'ds': 0}] for dayms, avg_std in
+ average_and_std_by_day],
+ stats={},
+ meta=self.get_meta(dataset))
+
+ min_lon, min_lat, max_lon, max_lat = bounding_polygon.bounds
+ result.extendMeta(min_lat, max_lat, min_lon, max_lon, "", start_time, end_time)
+ return result
+
+ def get_meta(self, dataset):
+
+ # TODO yea this is broken
+ if 'sst' in dataset.lower():
+ meta = {
+ "title": "Sea Surface Temperature (SST) Anomalies",
+ "description": "SST anomalies are departures from the 5-day pixel mean",
+ "units": u'\u00B0C',
+ "label": u'Difference from 5-Day mean (\u00B0C)'
+ }
+ elif 'chl' in dataset.lower():
+ meta = {
+ "title": "Chlorophyll Concentration Anomalies",
+ "description": "Chlorophyll Concentration anomalies are departures from the 5-day pixel mean",
+ "units": u'mg m^-3',
+ "label": u'Difference from 5-Day mean (mg m^-3)'
+ }
+ elif 'sla' in dataset.lower():
+ meta = {
+ "title": "Sea Level Anomaly Estimate (SLA) Anomalies",
+ "description": "SLA anomalies are departures from the 5-day pixel mean",
+ "units": u'm',
+ "label": u'Difference from 5-Day mean (m)'
+ }
+ elif 'sss' in dataset.lower():
+ meta = {
+ "title": "Sea Surface Salinity (SSS) Anomalies",
+ "description": "SSS anomalies are departures from the 5-day pixel mean",
+ "units": u'psu',
+ "label": u'Difference from 5-Day mean (psu)'
+ }
+ elif 'ccmp' in dataset.lower():
+ meta = {
+ "title": "Wind Speed Anomalies",
+ "description": "Wind Speed anomalies are departures from the 1-day pixel mean",
+ "units": u'm/s',
+ "label": u'Difference from 1-Day mean (m/s)'
+ }
+ elif 'trmm' in dataset.lower():
+ meta = {
+ "title": "Precipitation Anomalies",
+ "description": "Precipitation anomalies are departures from the 5-day pixel mean",
+ "units": u'mm',
+ "label": u'Difference from 5-Day mean (mm)'
+ }
+ else:
+ meta = {
+ "title": "Anomalies",
+ "description": "Anomalies are departures from the 5-day pixel mean",
+ "units": u'',
+ "label": u'Difference from 5-Day mean'
+ }
+ return meta
+
+
+class DDAResult(NexusResults):
+ def toImage(self):
+ from StringIO import StringIO
+ import matplotlib.pyplot as plt
+ from matplotlib.dates import date2num
+
+ times = [date2num(datetime.fromtimestamp(dayavglistdict[0]['time'], pytz.utc).date()) for dayavglistdict in
+ self.results()]
+ means = [dayavglistdict[0]['mean'] for dayavglistdict in self.results()]
+ plt.plot_date(times, means, '|g-')
+
+ plt.xlabel('Date')
+ plt.xticks(rotation=70)
+ plt.ylabel(u'Difference from 5-Day mean (\u00B0C)')
+ plt.title('Sea Surface Temperature (SST) Anomalies')
+ plt.grid(True)
+ plt.tight_layout()
+
+ sio = StringIO()
+ plt.savefig(sio, format='png')
+ return sio.getvalue()
+
+
+from threading import Lock
+from shapely.geometry import box
+
+DRIVER_LOCK = Lock()
+
+
+class NoClimatologyTile(Exception):
+ pass
+
+
+class NoDatasetTile(Exception):
+ pass
+
+
+def determine_parllelism(num_tiles):
+ """
+ Try to stay at a maximum of 1500 tiles per partition; But don't go over 128 partitions.
+ Also, don't go below the default of 8
+ """
+ num_partitions = max(min(num_tiles // 1500, 128), 8)
+ return num_partitions
+
+
+def spark_anomolies_driver(tile_ids, bounding_wkt, dataset, climatology, sc=None):
+ from functools import partial
+
+ with DRIVER_LOCK:
+ bounding_wkt_b = sc.broadcast(bounding_wkt)
+ dataset_b = sc.broadcast(dataset)
+ climatology_b = sc.broadcast(climatology)
+
+ # Parallelize list of tile ids
+ rdd = sc.parallelize(tile_ids, determine_parllelism(len(tile_ids)))
+
+ def add_tuple_elements(tuple1, tuple2):
+ cumulative_sum = tuple1[0] + tuple2[0]
+ cumulative_count = tuple1[1] + tuple2[1]
+
+ avg_1 = tuple1[0] / tuple1[1]
+ avg_2 = tuple2[0] / tuple2[1]
+
+ cumulative_var = parallel_variance(avg_1, tuple1[1], tuple1[2], avg_2, tuple2[1], tuple2[2])
+ return cumulative_sum, cumulative_count, cumulative_var
+
+ def parallel_variance(avg_a, count_a, var_a, avg_b, count_b, var_b):
+ # Thanks Wikipedia https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm
+ delta = avg_b - avg_a
+ m_a = var_a * (count_a - 1)
+ m_b = var_b * (count_b - 1)
+ M2 = m_a + m_b + delta ** 2 * count_a * count_b / (count_a + count_b)
+ return M2 / (count_a + count_b - 1)
+
+ def compute_avg_and_std(sum_cnt_var_tuple):
+ return sum_cnt_var_tuple[0] / sum_cnt_var_tuple[1], np.sqrt(sum_cnt_var_tuple[2])
+
+ result = rdd \
+ .mapPartitions(partial(calculate_diff, bounding_wkt=bounding_wkt_b, dataset=dataset_b,
+ climatology=climatology_b)) \
+ .reduceByKey(add_tuple_elements) \
+ .mapValues(compute_avg_and_std) \
+ .sortByKey() \
+ .collect()
+
+ return result
+
+
+def calculate_diff(tile_ids, bounding_wkt, dataset, climatology):
+ from itertools import chain
+
+ # Construct a list of generators that yield (day, sum, count, variance)
+ diff_generators = []
+
+ tile_ids = list(tile_ids)
+ if len(tile_ids) == 0:
+ return []
+ tile_service = NexusTileService()
+
+ for tile_id in tile_ids:
+ # Get the dataset tile
+ try:
+ dataset_tile = get_dataset_tile(tile_service, wkt.loads(bounding_wkt.value), tile_id)
+ except NoDatasetTile:
+ # This should only happen if all measurements in a tile become masked after applying the bounding polygon
+ continue
+
+ tile_day_of_year = dataset_tile.min_time.timetuple().tm_yday
+
+ # Get the climatology tile
+ try:
+ climatology_tile = get_climatology_tile(tile_service, wkt.loads(bounding_wkt.value),
+ box(dataset_tile.bbox.min_lon,
+ dataset_tile.bbox.min_lat,
+ dataset_tile.bbox.max_lon,
+ dataset_tile.bbox.max_lat),
+ climatology.value,
+ tile_day_of_year)
+ except NoClimatologyTile:
+ continue
+
+ diff_generators.append(generate_diff(dataset_tile, climatology_tile))
+
+ return chain(*diff_generators)
+
+
+def get_dataset_tile(tile_service, search_bounding_shape, tile_id):
+ the_time = datetime.now()
+
+ try:
+ # Load the dataset tile
+ dataset_tile = tile_service.find_tile_by_id(tile_id)[0]
+ # Mask it to the search domain
+ dataset_tile = tile_service.mask_tiles_to_polygon(search_bounding_shape, [dataset_tile])[0]
+ except IndexError:
+ raise NoDatasetTile()
+
+ print "%s Time to load dataset tile %s" % (str(datetime.now() - the_time), dataset_tile.tile_id)
+ return dataset_tile
+
+
+def get_climatology_tile(tile_service, search_bounding_shape, tile_bounding_shape, climatology_dataset,
+ tile_day_of_year):
+ the_time = datetime.now()
+ try:
+ # Load the tile
+ climatology_tile = tile_service.find_tile_by_polygon_and_most_recent_day_of_year(
+ tile_bounding_shape,
+ climatology_dataset,
+ tile_day_of_year)[0]
+
+ except IndexError:
+ raise NoClimatologyTile()
+
+ if search_bounding_shape.contains(tile_bounding_shape):
+ # The tile is totally contained in the search area, we don't need to mask it.
+ pass
+ else:
+ # The tile is not totally contained in the search area,
+ # we need to mask the data to the search domain.
+ try:
+ # Mask it to the search domain
+ climatology_tile = tile_service.mask_tiles_to_polygon(search_bounding_shape, [climatology_tile])[0]
+ except IndexError:
+ raise NoClimatologyTile()
+
+ print "%s Time to load climatology tile %s" % (str(datetime.now() - the_time), climatology_tile.tile_id)
+ return climatology_tile
+
+
+def generate_diff(data_tile, climatology_tile):
+ the_time = datetime.now()
+
+ diff = np.subtract(data_tile.data, climatology_tile.data)
+ diff_sum = np.nansum(diff)
+ diff_var = np.nanvar(diff)
+ diff_ct = np.ma.count(diff)
+
+ date_in_seconds = int((datetime.combine(data_tile.min_time.date(), datetime.min.time()).replace(
+ tzinfo=pytz.UTC) - EPOCH).total_seconds())
+
+ print "%s Time to generate diff between %s and %s" % (
+ str(datetime.now() - the_time), data_tile.tile_id, climatology_tile.tile_id)
+
+ yield (date_in_seconds, (diff_sum, diff_ct, diff_var))
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms_spark/HofMoellerSpark.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms_spark/HofMoellerSpark.py b/analysis/webservice/algorithms_spark/HofMoellerSpark.py
new file mode 100644
index 0000000..f6a0118
--- /dev/null
+++ b/analysis/webservice/algorithms_spark/HofMoellerSpark.py
@@ -0,0 +1,331 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology. All rights reserved
+"""
+import itertools
+import logging
+import traceback
+from cStringIO import StringIO
+from datetime import datetime
+from multiprocessing.dummy import Pool, Manager
+
+import matplotlib.pyplot as plt
+import mpld3
+import numpy as np
+from nexustiles.nexustiles import NexusTileService
+from matplotlib import cm
+from matplotlib.ticker import FuncFormatter
+
+from webservice.NexusHandler import SparkHandler, nexus_handler, DEFAULT_PARAMETERS_SPEC
+from webservice.webmodel import NexusProcessingException, NexusResults
+
+SENTINEL = 'STOP'
+LATITUDE = 0
+LONGITUDE = 1
+
+
+class LongitudeHofMoellerCalculator(object):
+ @staticmethod
+ def longitude_time_hofmoeller_stats(tile_in_spark):
+
+ (tile_id, index, min_lat, max_lat, min_lon, max_lon) = tile_in_spark
+
+ tile_service = NexusTileService()
+ try:
+ # Load the dataset tile
+ tile = tile_service.find_tile_by_id(tile_id)[0]
+ # Mask it to the search domain
+ tile = tile_service.mask_tiles_to_bbox(min_lat, max_lat, min_lon, max_lon, [tile])[0]
+ except IndexError:
+ return None
+
+ stat = {
+ 'sequence': index,
+ 'time': np.ma.min(tile.times),
+ 'lons': []
+ }
+ points = list(tile.nexus_point_generator())
+ data = sorted(points, key=lambda p: p.longitude)
+ points_by_lon = itertools.groupby(data, key=lambda p: p.longitude)
+
+ for lon, points_at_lon in points_by_lon:
+ values_at_lon = np.array([point.data_val for point in points_at_lon])
+ stat['lons'].append({
+ 'longitude': float(lon),
+ 'cnt': len(values_at_lon),
+ 'avg': np.mean(values_at_lon).item(),
+ 'max': np.max(values_at_lon).item(),
+ 'min': np.min(values_at_lon).item(),
+ 'std': np.std(values_at_lon).item()
+ })
+
+ return stat
+
+
+class LatitudeHofMoellerCalculator(object):
+ @staticmethod
+ def latitude_time_hofmoeller_stats(tile_in_spark):
+
+ (tile_id, index, min_lat, max_lat, min_lon, max_lon) = tile_in_spark
+
+ tile_service = NexusTileService()
+ try:
+ # Load the dataset tile
+ tile = tile_service.find_tile_by_id(tile_id)[0]
+ # Mask it to the search domain
+ tile = tile_service.mask_tiles_to_bbox(min_lat, max_lat, min_lon, max_lon, [tile])[0]
+ except IndexError:
+ return None
+
+ stat = {
+ 'sequence': index,
+ 'time': np.ma.min(tile.times),
+ 'lats': []
+ }
+
+ points = list(tile.nexus_point_generator())
+ data = sorted(points, key=lambda p: p.latitude)
+ points_by_lat = itertools.groupby(data, key=lambda p: p.latitude)
+
+ for lat, points_at_lat in points_by_lat:
+ values_at_lat = np.array([point.data_val for point in points_at_lat])
+
+ stat['lats'].append({
+ 'latitude': float(lat),
+ 'cnt': len(values_at_lat),
+ 'avg': np.mean(values_at_lat).item(),
+ 'max': np.max(values_at_lat).item(),
+ 'min': np.min(values_at_lat).item(),
+ 'std': np.std(values_at_lat).item()
+ })
+
+ return stat
+
+
+class BaseHoffMoellerHandlerImpl(SparkHandler):
+ def __init__(self):
+ SparkHandler.__init__(self)
+ self.log = logging.getLogger(__name__)
+
+ def applyDeseasonToHofMoellerByField(self, results, pivot="lats", field="avg", append=True):
+ shape = (len(results), len(results[0][pivot]))
+ if shape[0] <= 12:
+ return results
+ for a in range(0, 12):
+ values = []
+ for b in range(a, len(results), 12):
+ values.append(np.average([l[field] for l in results[b][pivot]]))
+ avg = np.average(values)
+
+ for b in range(a, len(results), 12):
+ for l in results[b][pivot]:
+ l["%sSeasonal" % field] = l[field] - avg
+
+ return results
+
+ def applyDeseasonToHofMoeller(self, results, pivot="lats", append=True):
+ results = self.applyDeseasonToHofMoellerByField(results, pivot, field="avg", append=append)
+ results = self.applyDeseasonToHofMoellerByField(results, pivot, field="min", append=append)
+ results = self.applyDeseasonToHofMoellerByField(results, pivot, field="max", append=append)
+ return results
+
+def determine_parllelism(num_tiles):
+ """
+ Try to stay at a maximum of 1500 tiles per partition; But don't go over 128 partitions.
+ Also, don't go below the default of 8
+ """
+ num_partitions = max(min(num_tiles // 1500, 128), 8)
+ return num_partitions
+
+@nexus_handler
+class LatitudeTimeHoffMoellerSparkHandlerImpl(BaseHoffMoellerHandlerImpl):
+ name = "Latitude/Time HofMoeller Spark"
+ path = "/latitudeTimeHofMoellerSpark"
+ description = "Computes a latitude/time HofMoeller plot given an arbitrary geographical area and time range"
+ params = DEFAULT_PARAMETERS_SPEC
+ singleton = True
+
+ def __init__(self):
+ BaseHoffMoellerHandlerImpl.__init__(self)
+
+ def calc(self, computeOptions, **args):
+ nexus_tiles_spark = [(tile.tile_id, x, computeOptions.get_min_lat(), computeOptions.get_max_lat(), computeOptions.get_min_lon(), computeOptions.get_max_lon()) for x, tile in enumerate(self._tile_service.find_tiles_in_box(computeOptions.get_min_lat(), computeOptions.get_max_lat(),
+ computeOptions.get_min_lon(), computeOptions.get_max_lon(),
+ computeOptions.get_dataset()[0],
+ computeOptions.get_start_time(),
+ computeOptions.get_end_time(), fetch_data=False))]
+
+ if len(nexus_tiles_spark) == 0:
+ raise NexusProcessingException.NoDataException(reason="No data found for selected timeframe")
+
+ # Parallelize list of tile ids
+ rdd = self._sc.parallelize(nexus_tiles_spark, determine_parllelism(len(nexus_tiles_spark)))
+ results = rdd.map(LatitudeHofMoellerCalculator.latitude_time_hofmoeller_stats).collect()
+
+ results = filter(None, results)
+ results = sorted(results, key=lambda entry: entry["time"])
+
+ results = self.applyDeseasonToHofMoeller(results)
+
+ result = HoffMoellerResults(results=results, computeOptions=computeOptions, type=HoffMoellerResults.LATITUDE)
+ return result
+
+
+@nexus_handler
+class LongitudeTimeHoffMoellerSparkHandlerImpl(BaseHoffMoellerHandlerImpl):
+ name = "Longitude/Time HofMoeller Spark"
+ path = "/longitudeTimeHofMoellerSpark"
+ description = "Computes a longitude/time HofMoeller plot given an arbitrary geographical area and time range"
+ params = DEFAULT_PARAMETERS_SPEC
+ singleton = True
+
+ def __init__(self):
+ BaseHoffMoellerHandlerImpl.__init__(self)
+
+ def calc(self, computeOptions, **args):
+ nexus_tiles_spark = [(tile.tile_id, x, computeOptions.get_min_lat(), computeOptions.get_max_lat(), computeOptions.get_min_lon(), computeOptions.get_max_lon()) for x, tile in enumerate(self._tile_service.find_tiles_in_box(computeOptions.get_min_lat(), computeOptions.get_max_lat(),
+ computeOptions.get_min_lon(), computeOptions.get_max_lon(),
+ computeOptions.get_dataset()[0],
+ computeOptions.get_start_time(),
+ computeOptions.get_end_time(), fetch_data=False))]
+
+ if len(nexus_tiles_spark) == 0:
+ raise NexusProcessingException.NoDataException(reason="No data found for selected timeframe")
+
+ # Parallelize list of tile ids
+ rdd = self._sc.parallelize(nexus_tiles_spark, determine_parllelism(len(nexus_tiles_spark)))
+ results = rdd.map(LongitudeHofMoellerCalculator.longitude_time_hofmoeller_stats).collect()
+
+ results = filter(None, results)
+ results = sorted(results, key=lambda entry: entry["time"])
+
+ results = self.applyDeseasonToHofMoeller(results, pivot="lons")
+
+ result = HoffMoellerResults(results=results, computeOptions=computeOptions, type=HoffMoellerResults.LONGITUDE)
+ return result
+
+
+class HoffMoellerResults(NexusResults):
+ LATITUDE = 0
+ LONGITUDE = 1
+
+ def __init__(self, results=None, meta=None, stats=None, computeOptions=None, **args):
+ NexusResults.__init__(self, results=results, meta=meta, stats=stats, computeOptions=computeOptions)
+ self.__type = args['type']
+
+ def createHoffmueller(self, data, coordSeries, timeSeries, coordName, title, interpolate='nearest'):
+ cmap = cm.coolwarm
+ # ls = LightSource(315, 45)
+ # rgb = ls.shade(data, cmap)
+
+ fig, ax = plt.subplots()
+ fig.set_size_inches(11.0, 8.5)
+ cax = ax.imshow(data, interpolation=interpolate, cmap=cmap)
+
+ def yFormatter(y, pos):
+ if y < len(coordSeries):
+ return "%s $^\circ$" % (int(coordSeries[int(y)] * 100.0) / 100.)
+ else:
+ return ""
+
+ def xFormatter(x, pos):
+ if x < len(timeSeries):
+ return timeSeries[int(x)].strftime('%b %Y')
+ else:
+ return ""
+
+ ax.xaxis.set_major_formatter(FuncFormatter(xFormatter))
+ ax.yaxis.set_major_formatter(FuncFormatter(yFormatter))
+
+ ax.set_title(title)
+ ax.set_ylabel(coordName)
+ ax.set_xlabel('Date')
+
+ fig.colorbar(cax)
+ fig.autofmt_xdate()
+
+ labels = ['point {0}'.format(i + 1) for i in range(len(data))]
+ # plugins.connect(fig, plugins.MousePosition(fontsize=14))
+ tooltip = mpld3.plugins.PointLabelTooltip(cax, labels=labels)
+
+ sio = StringIO()
+ plt.savefig(sio, format='png')
+ return sio.getvalue()
+
+ def createLongitudeHoffmueller(self, res, meta):
+ lonSeries = [m['longitude'] for m in res[0]['lons']]
+ timeSeries = [datetime.fromtimestamp(m['time'] / 1000) for m in res]
+
+ data = np.zeros((len(lonSeries), len(timeSeries)))
+
+ plotSeries = self.computeOptions().get_plot_series(default="avg") if self.computeOptions is not None else None
+ if plotSeries is None:
+ plotSeries = "avg"
+
+ for t in range(0, len(timeSeries)):
+ timeSet = res[t]
+ for l in range(0, len(lonSeries)):
+ latSet = timeSet['lons'][l]
+ value = latSet[plotSeries]
+ data[len(lonSeries) - l - 1][t] = value
+
+ title = meta['title']
+ source = meta['source']
+ dateRange = "%s - %s" % (timeSeries[0].strftime('%b %Y'), timeSeries[-1].strftime('%b %Y'))
+
+ return self.createHoffmueller(data, lonSeries, timeSeries, "Longitude",
+ "%s\n%s\n%s" % (title, source, dateRange), interpolate='nearest')
+
+ def createLatitudeHoffmueller(self, res, meta):
+ latSeries = [m['latitude'] for m in res[0]['lats']]
+ timeSeries = [datetime.fromtimestamp(m['time'] / 1000) for m in res]
+
+ data = np.zeros((len(latSeries), len(timeSeries)))
+
+ plotSeries = self.computeOptions().get_plot_series(default="avg") if self.computeOptions is not None else None
+ if plotSeries is None:
+ plotSeries = "avg"
+
+ for t in range(0, len(timeSeries)):
+ timeSet = res[t]
+ for l in range(0, len(latSeries)):
+ latSet = timeSet['lats'][l]
+ value = latSet[plotSeries]
+ data[len(latSeries) - l - 1][t] = value
+
+ title = meta['title']
+ source = meta['source']
+ dateRange = "%s - %s" % (timeSeries[0].strftime('%b %Y'), timeSeries[-1].strftime('%b %Y'))
+
+ return self.createHoffmueller(data, latSeries, timeSeries, "Latitude",
+ title="%s\n%s\n%s" % (title, source, dateRange), interpolate='nearest')
+
+ def toImage(self):
+ res = self.results()
+ meta = self.meta()
+
+ if self.__type == HoffMoellerResults.LATITUDE:
+ return self.createLatitudeHoffmueller(res, meta)
+ elif self.__type == HoffMoellerResults.LONGITUDE:
+ return self.createLongitudeHoffmueller(res, meta)
+ else:
+ raise Exception("Unsupported HoffMoeller Plot Type")
+
+
+def pool_worker(type, work_queue, done_queue):
+ try:
+
+ if type == LATITUDE:
+ calculator = LatitudeHofMoellerCalculator()
+ elif type == LONGITUDE:
+ calculator = LongitudeHofMoellerCalculator()
+
+ for work in iter(work_queue.get, SENTINEL):
+ scifunction = work[0]
+ args = work[1:]
+ result = calculator.__getattribute__(scifunction)(*args)
+ done_queue.put(result)
+
+ except Exception as e:
+ e_str = traceback.format_exc(e)
+ done_queue.put({'error': e_str})
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms_spark/Matchup.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms_spark/Matchup.py b/analysis/webservice/algorithms_spark/Matchup.py
new file mode 100644
index 0000000..8d90389
--- /dev/null
+++ b/analysis/webservice/algorithms_spark/Matchup.py
@@ -0,0 +1,691 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology. All rights reserved
+"""
+
+import json
+import logging
+import threading
+import time
+from datetime import datetime
+from itertools import chain
+from math import cos, radians
+
+import numpy as np
+import pyproj
+import requests
+from nexustiles.nexustiles import NexusTileService
+from pytz import timezone, UTC
+from scipy import spatial
+from shapely import wkt
+from shapely.geometry import Point
+from shapely.geometry import box
+from shapely.geos import ReadingError
+
+from webservice.NexusHandler import SparkHandler, nexus_handler
+from webservice.algorithms.doms import config as edge_endpoints
+from webservice.algorithms.doms import values as doms_values
+from webservice.algorithms.doms.BaseDomsHandler import DomsQueryResults
+from webservice.algorithms.doms.ResultsStorage import ResultsStorage
+from webservice.webmodel import NexusProcessingException
+
+EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
+ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
+
+
+def iso_time_to_epoch(str_time):
+ return (datetime.strptime(str_time, "%Y-%m-%dT%H:%M:%SZ").replace(
+ tzinfo=UTC) - EPOCH).total_seconds()
+
+
+@nexus_handler
+class Matchup(SparkHandler):
+ name = "Matchup"
+ path = "/match_spark"
+ description = "Match measurements between two or more datasets"
+
+ params = {
+ "primary": {
+ "name": "Primary Dataset",
+ "type": "string",
+ "description": "The Primary dataset used to find matches for. Required"
+ },
+ "matchup": {
+ "name": "Match-Up Datasets",
+ "type": "comma-delimited string",
+ "description": "The Dataset(s) being searched for measurements that match the Primary. Required"
+ },
+ "parameter": {
+ "name": "Match-Up Parameter",
+ "type": "string",
+ "description": "The parameter of interest used for the match up. One of 'sst', 'sss', 'wind'. Required"
+ },
+ "startTime": {
+ "name": "Start Time",
+ "type": "string",
+ "description": "Starting time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH. Required"
+ },
+ "endTime": {
+ "name": "End Time",
+ "type": "string",
+ "description": "Ending time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH. Required"
+ },
+ "b": {
+ "name": "Bounding box",
+ "type": "comma-delimited float",
+ "description": "Minimum (Western) Longitude, Minimum (Southern) Latitude, "
+ "Maximum (Eastern) Longitude, Maximum (Northern) Latitude. Required"
+ },
+ "depthMin": {
+ "name": "Minimum Depth",
+ "type": "float",
+ "description": "Minimum depth of measurements. Must be less than depthMax. Optional. Default: no limit"
+ },
+ "depthMax": {
+ "name": "Maximum Depth",
+ "type": "float",
+ "description": "Maximum depth of measurements. Must be greater than depthMin. Optional. Default: no limit"
+ },
+ "tt": {
+ "name": "Time Tolerance",
+ "type": "long",
+ "description": "Tolerance in time (seconds) when comparing two measurements. Optional. Default: 86400"
+ },
+ "rt": {
+ "name": "Radius Tolerance",
+ "type": "float",
+ "description": "Tolerance in radius (meters) when comparing two measurements. Optional. Default: 1000"
+ },
+ "platforms": {
+ "name": "Platforms",
+ "type": "comma-delimited integer",
+ "description": "Platforms to include for matchup consideration. Required"
+ },
+ "matchOnce": {
+ "name": "Match Once",
+ "type": "boolean",
+ "description": "Optional True/False flag used to determine if more than one match per primary point is returned. "
+ + "If true, only the nearest point will be returned for each primary point. "
+ + "If false, all points within the tolerances will be returned for each primary point. Default: False"
+ },
+ "resultSizeLimit": {
+ "name": "Result Size Limit",
+ "type": "int",
+ "description": "Optional integer value that limits the number of results returned from the matchup. "
+ "If the number of primary matches is greater than this limit, the service will respond with "
+ "(HTTP 202: Accepted) and an empty response body. A value of 0 means return all results. "
+ "Default: 500"
+ }
+ }
+ singleton = True
+
+ def __init__(self):
+ SparkHandler.__init__(self, skipCassandra=True)
+ self.log = logging.getLogger(__name__)
+
+ def parse_arguments(self, request):
+ # Parse input arguments
+ self.log.debug("Parsing arguments")
+ try:
+ bounding_polygon = request.get_bounding_polygon()
+ except:
+ raise NexusProcessingException(
+ reason="'b' argument is required. Must be comma-delimited float formatted as Minimum (Western) Longitude, Minimum (Southern) Latitude, Maximum (Eastern) Longitude, Maximum (Northern) Latitude",
+ code=400)
+ primary_ds_name = request.get_argument('primary', None)
+ if primary_ds_name is None:
+ raise NexusProcessingException(reason="'primary' argument is required", code=400)
+ matchup_ds_names = request.get_argument('matchup', None)
+ if matchup_ds_names is None:
+ raise NexusProcessingException(reason="'matchup' argument is required", code=400)
+
+ parameter_s = request.get_argument('parameter', 'sst')
+ if parameter_s not in ['sst', 'sss', 'wind']:
+ raise NexusProcessingException(
+ reason="Parameter %s not supported. Must be one of 'sst', 'sss', 'wind'." % parameter_s, code=400)
+
+ try:
+ start_time = request.get_start_datetime()
+ except:
+ raise NexusProcessingException(
+ reason="'startTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ",
+ code=400)
+ try:
+ end_time = request.get_end_datetime()
+ except:
+ raise NexusProcessingException(
+ reason="'endTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ",
+ code=400)
+
+ if start_time > end_time:
+ raise NexusProcessingException(
+ reason="The starting time must be before the ending time. Received startTime: %s, endTime: %s" % (
+ request.get_start_datetime().strftime(ISO_8601), request.get_end_datetime().strftime(ISO_8601)),
+ code=400)
+
+ depth_min = request.get_decimal_arg('depthMin', default=None)
+ depth_max = request.get_decimal_arg('depthMax', default=None)
+
+ if depth_min is not None and depth_max is not None and depth_min >= depth_max:
+ raise NexusProcessingException(
+ reason="Depth Min should be less than Depth Max", code=400)
+
+ time_tolerance = request.get_int_arg('tt', default=86400)
+ radius_tolerance = request.get_decimal_arg('rt', default=1000.0)
+ platforms = request.get_argument('platforms', None)
+ if platforms is None:
+ raise NexusProcessingException(reason="'platforms' argument is required", code=400)
+ try:
+ p_validation = platforms.split(',')
+ p_validation = [int(p) for p in p_validation]
+ del p_validation
+ except:
+ raise NexusProcessingException(reason="platforms must be a comma-delimited list of integers", code=400)
+
+ match_once = request.get_boolean_arg("matchOnce", default=False)
+
+ result_size_limit = request.get_int_arg("resultSizeLimit", default=500)
+
+ start_seconds_from_epoch = long((start_time - EPOCH).total_seconds())
+ end_seconds_from_epoch = long((end_time - EPOCH).total_seconds())
+
+ return bounding_polygon, primary_ds_name, matchup_ds_names, parameter_s, \
+ start_time, start_seconds_from_epoch, end_time, end_seconds_from_epoch, \
+ depth_min, depth_max, time_tolerance, radius_tolerance, \
+ platforms, match_once, result_size_limit
+
+ def calc(self, request, **args):
+ start = datetime.utcnow()
+ # TODO Assuming Satellite primary
+ bounding_polygon, primary_ds_name, matchup_ds_names, parameter_s, \
+ start_time, start_seconds_from_epoch, end_time, end_seconds_from_epoch, \
+ depth_min, depth_max, time_tolerance, radius_tolerance, \
+ platforms, match_once, result_size_limit = self.parse_arguments(request)
+
+ with ResultsStorage() as resultsStorage:
+
+ execution_id = str(resultsStorage.insertExecution(None, start, None, None))
+
+ self.log.debug("Querying for tiles in search domain")
+ # Get tile ids in box
+ tile_ids = [tile.tile_id for tile in
+ self._tile_service.find_tiles_in_polygon(bounding_polygon, primary_ds_name,
+ start_seconds_from_epoch, end_seconds_from_epoch,
+ fetch_data=False, fl='id',
+ sort=['tile_min_time_dt asc', 'tile_min_lon asc',
+ 'tile_min_lat asc'], rows=5000)]
+
+ # Call spark_matchup
+ self.log.debug("Calling Spark Driver")
+ try:
+ spark_result = spark_matchup_driver(tile_ids, wkt.dumps(bounding_polygon), primary_ds_name,
+ matchup_ds_names, parameter_s, depth_min, depth_max, time_tolerance,
+ radius_tolerance, platforms, match_once, sc=self._sc)
+ except Exception as e:
+ self.log.exception(e)
+ raise NexusProcessingException(reason="An unknown error occurred while computing matches", code=500)
+
+ end = datetime.utcnow()
+
+ self.log.debug("Building and saving results")
+ args = {
+ "primary": primary_ds_name,
+ "matchup": matchup_ds_names,
+ "startTime": start_time,
+ "endTime": end_time,
+ "bbox": request.get_argument('b'),
+ "timeTolerance": time_tolerance,
+ "radiusTolerance": float(radius_tolerance),
+ "platforms": platforms,
+ "parameter": parameter_s
+ }
+
+ if depth_min is not None:
+ args["depthMin"] = float(depth_min)
+
+ if depth_max is not None:
+ args["depthMax"] = float(depth_max)
+
+ total_keys = len(spark_result.keys())
+ total_values = sum(len(v) for v in spark_result.itervalues())
+ details = {
+ "timeToComplete": int((end - start).total_seconds()),
+ "numInSituRecords": 0,
+ "numInSituMatched": total_values,
+ "numGriddedChecked": 0,
+ "numGriddedMatched": total_keys
+ }
+
+ matches = Matchup.convert_to_matches(spark_result)
+
+ def do_result_insert():
+ with ResultsStorage() as storage:
+ storage.insertResults(results=matches, params=args, stats=details,
+ startTime=start, completeTime=end, userEmail="",
+ execution_id=execution_id)
+
+ threading.Thread(target=do_result_insert).start()
+
+ if 0 < result_size_limit < len(matches):
+ result = DomsQueryResults(results=None, args=args, details=details, bounds=None, count=None,
+ computeOptions=None, executionId=execution_id, status_code=202)
+ else:
+ result = DomsQueryResults(results=matches, args=args, details=details, bounds=None, count=None,
+ computeOptions=None, executionId=execution_id)
+
+ return result
+
+ @classmethod
+ def convert_to_matches(cls, spark_result):
+ matches = []
+ for primary_domspoint, matched_domspoints in spark_result.iteritems():
+ p_matched = [cls.domspoint_to_dict(p_match) for p_match in matched_domspoints]
+
+ primary = cls.domspoint_to_dict(primary_domspoint)
+ primary['matches'] = list(p_matched)
+ matches.append(primary)
+ return matches
+
+ @staticmethod
+ def domspoint_to_dict(domspoint):
+ return {
+ "sea_water_temperature": domspoint.sst,
+ "sea_water_temperature_depth": domspoint.sst_depth,
+ "sea_water_salinity": domspoint.sss,
+ "sea_water_salinity_depth": domspoint.sss_depth,
+ "wind_speed": domspoint.wind_speed,
+ "wind_direction": domspoint.wind_direction,
+ "wind_u": domspoint.wind_u,
+ "wind_v": domspoint.wind_v,
+ "platform": doms_values.getPlatformById(domspoint.platform),
+ "device": doms_values.getDeviceById(domspoint.device),
+ "x": str(domspoint.longitude),
+ "y": str(domspoint.latitude),
+ "point": "Point(%s %s)" % (domspoint.longitude, domspoint.latitude),
+ "time": datetime.strptime(domspoint.time, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=UTC),
+ "fileurl": domspoint.file_url,
+ "id": domspoint.data_id,
+ "source": domspoint.source,
+ }
+
+
+class DomsPoint(object):
+ def __init__(self, longitude=None, latitude=None, time=None, depth=None, data_id=None):
+
+ self.time = time
+ self.longitude = longitude
+ self.latitude = latitude
+ self.depth = depth
+ self.data_id = data_id
+
+ self.wind_u = None
+ self.wind_v = None
+ self.wind_direction = None
+ self.wind_speed = None
+ self.sst = None
+ self.sst_depth = None
+ self.sss = None
+ self.sss_depth = None
+ self.source = None
+ self.depth = None
+ self.platform = None
+ self.device = None
+ self.file_url = None
+
+ def __repr__(self):
+ return str(self.__dict__)
+
+ @staticmethod
+ def from_nexus_point(nexus_point, tile=None, parameter='sst'):
+ point = DomsPoint()
+
+ point.data_id = "%s[%s]" % (tile.tile_id, nexus_point.index)
+
+ # TODO Not an ideal solution; but it works for now.
+ if parameter == 'sst':
+ point.sst = nexus_point.data_val.item()
+ elif parameter == 'sss':
+ point.sss = nexus_point.data_val.item()
+ elif parameter == 'wind':
+ point.wind_u = nexus_point.data_val.item()
+ try:
+ point.wind_v = tile.meta_data['wind_v'][tuple(nexus_point.index)].item()
+ except (KeyError, IndexError):
+ pass
+ try:
+ point.wind_direction = tile.meta_data['wind_dir'][tuple(nexus_point.index)].item()
+ except (KeyError, IndexError):
+ pass
+ try:
+ point.wind_speed = tile.meta_data['wind_speed'][tuple(nexus_point.index)].item()
+ except (KeyError, IndexError):
+ pass
+ else:
+ raise NotImplementedError('%s not supported. Only sst, sss, and wind parameters are supported.' % parameter)
+
+ point.longitude = nexus_point.longitude.item()
+ point.latitude = nexus_point.latitude.item()
+
+ point.time = datetime.utcfromtimestamp(nexus_point.time).strftime('%Y-%m-%dT%H:%M:%SZ')
+
+ try:
+ point.depth = nexus_point.depth
+ except KeyError:
+ # No depth associated with this measurement
+ pass
+
+ point.sst_depth = 0
+ point.source = tile.dataset
+ point.file_url = tile.granule
+
+ # TODO device should change based on the satellite making the observations.
+ point.platform = 9
+ point.device = 5
+ return point
+
+ @staticmethod
+ def from_edge_point(edge_point):
+ point = DomsPoint()
+
+ try:
+ x, y = wkt.loads(edge_point['point']).coords[0]
+ except ReadingError:
+ try:
+ x, y = Point(*[float(c) for c in edge_point['point'].split(' ')]).coords[0]
+ except ValueError:
+ y, x = Point(*[float(c) for c in edge_point['point'].split(',')]).coords[0]
+
+ point.longitude = x
+ point.latitude = y
+
+ point.time = edge_point['time']
+
+ point.wind_u = edge_point.get('eastward_wind')
+ point.wind_v = edge_point.get('northward_wind')
+ point.wind_direction = edge_point.get('wind_direction')
+ point.wind_speed = edge_point.get('wind_speed')
+ point.sst = edge_point.get('sea_water_temperature')
+ point.sst_depth = edge_point.get('sea_water_temperature_depth')
+ point.sss = edge_point.get('sea_water_salinity')
+ point.sss_depth = edge_point.get('sea_water_salinity_depth')
+ point.source = edge_point.get('source')
+ point.platform = edge_point.get('platform')
+ point.device = edge_point.get('device')
+ point.file_url = edge_point.get('fileurl')
+
+ try:
+ point.data_id = unicode(edge_point['id'])
+ except KeyError:
+ point.data_id = "%s:%s:%s" % (point.time, point.longitude, point.latitude)
+
+ return point
+
+
+from threading import Lock
+
+DRIVER_LOCK = Lock()
+
+
+def spark_matchup_driver(tile_ids, bounding_wkt, primary_ds_name, matchup_ds_names, parameter, depth_min, depth_max,
+ time_tolerance, radius_tolerance, platforms, match_once, sc=None):
+ from functools import partial
+
+ with DRIVER_LOCK:
+ # Broadcast parameters
+ primary_b = sc.broadcast(primary_ds_name)
+ matchup_b = sc.broadcast(matchup_ds_names)
+ depth_min_b = sc.broadcast(float(depth_min) if depth_min is not None else None)
+ depth_max_b = sc.broadcast(float(depth_max) if depth_max is not None else None)
+ tt_b = sc.broadcast(time_tolerance)
+ rt_b = sc.broadcast(float(radius_tolerance))
+ platforms_b = sc.broadcast(platforms)
+ bounding_wkt_b = sc.broadcast(bounding_wkt)
+ parameter_b = sc.broadcast(parameter)
+
+ # Parallelize list of tile ids
+ rdd = sc.parallelize(tile_ids, determine_parllelism(len(tile_ids)))
+
+ # Map Partitions ( list(tile_id) )
+ rdd_filtered = rdd.mapPartitions(
+ partial(match_satellite_to_insitu, primary_b=primary_b, matchup_b=matchup_b, parameter_b=parameter_b, tt_b=tt_b,
+ rt_b=rt_b, platforms_b=platforms_b, bounding_wkt_b=bounding_wkt_b, depth_min_b=depth_min_b,
+ depth_max_b=depth_max_b), preservesPartitioning=True) \
+ .filter(lambda p_m_tuple: abs(
+ iso_time_to_epoch(p_m_tuple[0].time) - iso_time_to_epoch(p_m_tuple[1].time)) <= time_tolerance)
+
+ if match_once:
+ # Only the 'nearest' point for each primary should be returned. Add an extra map/reduce which calculates
+ # the distance and finds the minimum
+
+ # Method used for calculating the distance between 2 DomsPoints
+ from pyproj import Geod
+
+ def dist(primary, matchup):
+ wgs84_geod = Geod(ellps='WGS84')
+ lat1, lon1 = (primary.latitude, primary.longitude)
+ lat2, lon2 = (matchup.latitude, matchup.longitude)
+ az12, az21, distance = wgs84_geod.inv(lon1, lat1, lon2, lat2)
+ return distance
+
+ rdd_filtered = rdd_filtered \
+ .map(lambda (primary, matchup): tuple([primary, tuple([matchup, dist(primary, matchup)])])) \
+ .reduceByKey(lambda match_1, match_2: match_1 if match_1[1] < match_2[1] else match_2) \
+ .mapValues(lambda x: [x[0]])
+ else:
+ rdd_filtered = rdd_filtered \
+ .combineByKey(lambda value: [value], # Create 1 element list
+ lambda value_list, value: value_list + [value], # Add 1 element to list
+ lambda value_list_a, value_list_b: value_list_a + value_list_b) # Add two lists together
+
+ result_as_map = rdd_filtered.collectAsMap()
+
+ return result_as_map
+
+
+def determine_parllelism(num_tiles):
+ """
+ Try to stay at a maximum of 140 tiles per partition; But don't go over 128 partitions.
+ Also, don't go below the default of 8
+ """
+ num_partitions = max(min(num_tiles / 140, 128), 8)
+ return num_partitions
+
+
+def add_meters_to_lon_lat(lon, lat, meters):
+ """
+ Uses a simple approximation of
+ 1 degree latitude = 111,111 meters
+ 1 degree longitude = 111,111 meters * cosine(latitude)
+ :param lon: longitude to add meters to
+ :param lat: latitude to add meters to
+ :param meters: meters to add to the longitude and latitude values
+ :return: (longitude, latitude) increased by given meters
+ """
+ longitude = lon + ((meters / 111111) * cos(radians(lat)))
+ latitude = lat + (meters / 111111)
+
+ return longitude, latitude
+
+
+def match_satellite_to_insitu(tile_ids, primary_b, matchup_b, parameter_b, tt_b, rt_b, platforms_b,
+ bounding_wkt_b, depth_min_b, depth_max_b):
+ the_time = datetime.now()
+ tile_ids = list(tile_ids)
+ if len(tile_ids) == 0:
+ return []
+ tile_service = NexusTileService()
+
+ # Determine the spatial temporal extents of this partition of tiles
+ tiles_bbox = tile_service.get_bounding_box(tile_ids)
+ tiles_min_time = tile_service.get_min_time(tile_ids)
+ tiles_max_time = tile_service.get_max_time(tile_ids)
+
+ # Increase spatial extents by the radius tolerance
+ matchup_min_lon, matchup_min_lat = add_meters_to_lon_lat(tiles_bbox.bounds[0], tiles_bbox.bounds[1],
+ -1 * rt_b.value)
+ matchup_max_lon, matchup_max_lat = add_meters_to_lon_lat(tiles_bbox.bounds[2], tiles_bbox.bounds[3], rt_b.value)
+
+ # Don't go outside of the search domain
+ search_min_x, search_min_y, search_max_x, search_max_y = wkt.loads(bounding_wkt_b.value).bounds
+ matchup_min_lon = max(matchup_min_lon, search_min_x)
+ matchup_min_lat = max(matchup_min_lat, search_min_y)
+ matchup_max_lon = min(matchup_max_lon, search_max_x)
+ matchup_max_lat = min(matchup_max_lat, search_max_y)
+
+ # Find the centroid of the matchup bounding box and initialize the projections
+ matchup_center = box(matchup_min_lon, matchup_min_lat, matchup_max_lon, matchup_max_lat).centroid.coords[0]
+ aeqd_proj = pyproj.Proj(proj='aeqd', lon_0=matchup_center[0], lat_0=matchup_center[1])
+ lonlat_proj = pyproj.Proj(proj='lonlat')
+
+ # Increase temporal extents by the time tolerance
+ matchup_min_time = tiles_min_time - tt_b.value
+ matchup_max_time = tiles_max_time + tt_b.value
+ print "%s Time to determine spatial-temporal extents for partition %s to %s" % (
+ str(datetime.now() - the_time), tile_ids[0], tile_ids[-1])
+
+ # Query edge for all points within the spatial-temporal extents of this partition
+ the_time = datetime.now()
+ edge_session = requests.Session()
+ edge_results = []
+ with edge_session:
+ for insitudata_name in matchup_b.value.split(','):
+ bbox = ','.join(
+ [str(matchup_min_lon), str(matchup_min_lat), str(matchup_max_lon), str(matchup_max_lat)])
+ edge_response = query_edge(insitudata_name, parameter_b.value, matchup_min_time, matchup_max_time, bbox,
+ platforms_b.value, depth_min_b.value, depth_max_b.value, session=edge_session)
+ if edge_response['totalResults'] == 0:
+ continue
+ r = edge_response['results']
+ for p in r:
+ p['source'] = insitudata_name
+ edge_results.extend(r)
+ print "%s Time to call edge for partition %s to %s" % (str(datetime.now() - the_time), tile_ids[0], tile_ids[-1])
+ if len(edge_results) == 0:
+ return []
+
+ # Convert edge points to utm
+ the_time = datetime.now()
+ matchup_points = np.ndarray((len(edge_results), 2), dtype=np.float32)
+ for n, edge_point in enumerate(edge_results):
+ try:
+ x, y = wkt.loads(edge_point['point']).coords[0]
+ except ReadingError:
+ try:
+ x, y = Point(*[float(c) for c in edge_point['point'].split(' ')]).coords[0]
+ except ValueError:
+ y, x = Point(*[float(c) for c in edge_point['point'].split(',')]).coords[0]
+
+ matchup_points[n][0], matchup_points[n][1] = pyproj.transform(p1=lonlat_proj, p2=aeqd_proj, x=x, y=y)
+ print "%s Time to convert match points for partition %s to %s" % (
+ str(datetime.now() - the_time), tile_ids[0], tile_ids[-1])
+
+ # Build kdtree from matchup points
+ the_time = datetime.now()
+ m_tree = spatial.cKDTree(matchup_points, leafsize=30)
+ print "%s Time to build matchup tree" % (str(datetime.now() - the_time))
+
+ # The actual matching happens in the generator. This is so that we only load 1 tile into memory at a time
+ match_generators = [match_tile_to_point_generator(tile_service, tile_id, m_tree, edge_results, bounding_wkt_b.value,
+ parameter_b.value, rt_b.value, lonlat_proj, aeqd_proj) for tile_id
+ in tile_ids]
+
+ return chain(*match_generators)
+
+
+def match_tile_to_point_generator(tile_service, tile_id, m_tree, edge_results, search_domain_bounding_wkt,
+ search_parameter, radius_tolerance, lonlat_proj, aeqd_proj):
+ from nexustiles.model.nexusmodel import NexusPoint
+ from webservice.algorithms_spark.Matchup import DomsPoint # Must import DomsPoint or Spark complains
+
+ # Load tile
+ try:
+ the_time = datetime.now()
+ tile = tile_service.mask_tiles_to_polygon(wkt.loads(search_domain_bounding_wkt),
+ tile_service.find_tile_by_id(tile_id))[0]
+ print "%s Time to load tile %s" % (str(datetime.now() - the_time), tile_id)
+ except IndexError:
+ # This should only happen if all measurements in a tile become masked after applying the bounding polygon
+ raise StopIteration
+
+ # Convert valid tile lat,lon tuples to UTM tuples
+ the_time = datetime.now()
+ # Get list of indices of valid values
+ valid_indices = tile.get_indices()
+ primary_points = np.array(
+ [pyproj.transform(p1=lonlat_proj, p2=aeqd_proj, x=tile.longitudes[aslice[2]], y=tile.latitudes[aslice[1]]) for
+ aslice in valid_indices])
+
+ print "%s Time to convert primary points for tile %s" % (str(datetime.now() - the_time), tile_id)
+
+ a_time = datetime.now()
+ p_tree = spatial.cKDTree(primary_points, leafsize=30)
+ print "%s Time to build primary tree" % (str(datetime.now() - a_time))
+
+ a_time = datetime.now()
+ matched_indexes = p_tree.query_ball_tree(m_tree, radius_tolerance)
+ print "%s Time to query primary tree for tile %s" % (str(datetime.now() - a_time), tile_id)
+ for i, point_matches in enumerate(matched_indexes):
+ if len(point_matches) > 0:
+ p_nexus_point = NexusPoint(tile.latitudes[valid_indices[i][1]],
+ tile.longitudes[valid_indices[i][2]], None,
+ tile.times[valid_indices[i][0]], valid_indices[i],
+ tile.data[tuple(valid_indices[i])])
+ p_doms_point = DomsPoint.from_nexus_point(p_nexus_point, tile=tile, parameter=search_parameter)
+ for m_point_index in point_matches:
+ m_doms_point = DomsPoint.from_edge_point(edge_results[m_point_index])
+ yield p_doms_point, m_doms_point
+
+
+def query_edge(dataset, variable, startTime, endTime, bbox, platform, depth_min, depth_max, itemsPerPage=1000,
+ startIndex=0, stats=True, session=None):
+ try:
+ startTime = datetime.utcfromtimestamp(startTime).strftime('%Y-%m-%dT%H:%M:%SZ')
+ except TypeError:
+ # Assume we were passed a properly formatted string
+ pass
+
+ try:
+ endTime = datetime.utcfromtimestamp(endTime).strftime('%Y-%m-%dT%H:%M:%SZ')
+ except TypeError:
+ # Assume we were passed a properly formatted string
+ pass
+
+ try:
+ platform = platform.split(',')
+ except AttributeError:
+ # Assume we were passed a list
+ pass
+
+ params = {"variable": variable,
+ "startTime": startTime,
+ "endTime": endTime,
+ "bbox": bbox,
+ "minDepth": depth_min,
+ "maxDepth": depth_max,
+ "platform": platform,
+ "itemsPerPage": itemsPerPage, "startIndex": startIndex, "stats": str(stats).lower()}
+
+ if session is not None:
+ edge_request = session.get(edge_endpoints.getEndpointByName(dataset)['url'], params=params)
+ else:
+ edge_request = requests.get(edge_endpoints.getEndpointByName(dataset)['url'], params=params)
+
+ edge_request.raise_for_status()
+ edge_response = json.loads(edge_request.text)
+
+ # Get all edge results
+ next_page_url = edge_response.get('next', None)
+ while next_page_url is not None:
+ if session is not None:
+ edge_page_request = session.get(next_page_url)
+ else:
+ edge_page_request = requests.get(next_page_url)
+
+ edge_page_request.raise_for_status()
+ edge_page_response = json.loads(edge_page_request.text)
+
+ edge_response['results'].extend(edge_page_response['results'])
+
+ next_page_url = edge_page_response.get('next', None)
+
+ return edge_response
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py b/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py
new file mode 100644
index 0000000..e332654
--- /dev/null
+++ b/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py
@@ -0,0 +1,248 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology. All rights reserved
+"""
+import logging
+
+import numpy as np
+from nexustiles.nexustiles import NexusTileService
+
+# from time import time
+from webservice.NexusHandler import nexus_handler, SparkHandler, DEFAULT_PARAMETERS_SPEC
+from webservice.webmodel import NexusResults, NexusProcessingException, NoDataException
+
+
+@nexus_handler
+class TimeAvgMapSparkHandlerImpl(SparkHandler):
+ name = "Time Average Map Spark"
+ path = "/timeAvgMapSpark"
+ description = "Computes a Latitude/Longitude Time Average plot given an arbitrary geographical area and time range"
+ params = DEFAULT_PARAMETERS_SPEC
+ singleton = True
+
+ def __init__(self):
+ SparkHandler.__init__(self)
+ self.log = logging.getLogger(__name__)
+ # self.log.setLevel(logging.DEBUG)
+
+ @staticmethod
+ def _map(tile_in_spark):
+ tile_bounds = tile_in_spark[0]
+ (min_lat, max_lat, min_lon, max_lon,
+ min_y, max_y, min_x, max_x) = tile_bounds
+ startTime = tile_in_spark[1]
+ endTime = tile_in_spark[2]
+ ds = tile_in_spark[3]
+ tile_service = NexusTileService()
+ # print 'Started tile {0}'.format(tile_bounds)
+ # sys.stdout.flush()
+ tile_inbounds_shape = (max_y - min_y + 1, max_x - min_x + 1)
+ # days_at_a_time = 90
+ days_at_a_time = 30
+ # days_at_a_time = 7
+ # days_at_a_time = 1
+ # print 'days_at_a_time = {0}'.format(days_at_a_time)
+ t_incr = 86400 * days_at_a_time
+ sum_tile = np.array(np.zeros(tile_inbounds_shape, dtype=np.float64))
+ cnt_tile = np.array(np.zeros(tile_inbounds_shape, dtype=np.uint32))
+ t_start = startTime
+ while t_start <= endTime:
+ t_end = min(t_start + t_incr, endTime)
+ # t1 = time()
+ # print 'nexus call start at time {0}'.format(t1)
+ # sys.stdout.flush()
+ # nexus_tiles = \
+ # TimeAvgMapSparkHandlerImpl.query_by_parts(tile_service,
+ # min_lat, max_lat,
+ # min_lon, max_lon,
+ # ds,
+ # t_start,
+ # t_end,
+ # part_dim=2)
+ nexus_tiles = \
+ tile_service.get_tiles_bounded_by_box(min_lat, max_lat,
+ min_lon, max_lon,
+ ds=ds,
+ start_time=t_start,
+ end_time=t_end)
+ # t2 = time()
+ # print 'nexus call end at time %f' % t2
+ # print 'secs in nexus call: ', t2 - t1
+ # print 't %d to %d - Got %d tiles' % (t_start, t_end,
+ # len(nexus_tiles))
+ # for nt in nexus_tiles:
+ # print nt.granule
+ # print nt.section_spec
+ # print 'lat min/max:', np.ma.min(nt.latitudes), np.ma.max(nt.latitudes)
+ # print 'lon min/max:', np.ma.min(nt.longitudes), np.ma.max(nt.longitudes)
+ # sys.stdout.flush()
+
+ for tile in nexus_tiles:
+ tile.data.data[:, :] = np.nan_to_num(tile.data.data)
+ sum_tile += tile.data.data[0, min_y:max_y + 1, min_x:max_x + 1]
+ cnt_tile += (~tile.data.mask[0,
+ min_y:max_y + 1,
+ min_x:max_x + 1]).astype(np.uint8)
+ t_start = t_end + 1
+
+ # print 'cnt_tile = ', cnt_tile
+ # cnt_tile.mask = ~(cnt_tile.data.astype(bool))
+ # sum_tile.mask = cnt_tile.mask
+ # avg_tile = sum_tile / cnt_tile
+ # stats_tile = [[{'avg': avg_tile.data[y,x], 'cnt': cnt_tile.data[y,x]} for x in range(tile_inbounds_shape[1])] for y in range(tile_inbounds_shape[0])]
+ # print 'Finished tile', tile_bounds
+ # print 'Tile avg = ', avg_tile
+ # sys.stdout.flush()
+ return ((min_lat, max_lat, min_lon, max_lon), (sum_tile, cnt_tile))
+
+ def calc(self, computeOptions, **args):
+ """
+
+ :param computeOptions: StatsComputeOptions
+ :param args: dict
+ :return:
+ """
+
+ spark_master, spark_nexecs, spark_nparts = computeOptions.get_spark_cfg()
+ self._setQueryParams(computeOptions.get_dataset()[0],
+ (float(computeOptions.get_min_lat()),
+ float(computeOptions.get_max_lat()),
+ float(computeOptions.get_min_lon()),
+ float(computeOptions.get_max_lon())),
+ computeOptions.get_start_time(),
+ computeOptions.get_end_time(),
+ spark_master=spark_master,
+ spark_nexecs=spark_nexecs,
+ spark_nparts=spark_nparts)
+
+ if 'CLIM' in self._ds:
+ raise NexusProcessingException(
+ reason="Cannot compute Latitude/Longitude Time Average plot on a climatology", code=400)
+
+ nexus_tiles = self._find_global_tile_set()
+ # print 'tiles:'
+ # for tile in nexus_tiles:
+ # print tile.granule
+ # print tile.section_spec
+ # print 'lat:', tile.latitudes
+ # print 'lon:', tile.longitudes
+
+ # nexus_tiles)
+ if len(nexus_tiles) == 0:
+ raise NoDataException(reason="No data found for selected timeframe")
+
+ self.log.debug('Found {0} tiles'.format(len(nexus_tiles)))
+
+ self.log.debug('Using Native resolution: lat_res={0}, lon_res={1}'.format(self._latRes, self._lonRes))
+ nlats = int((self._maxLat - self._minLatCent) / self._latRes) + 1
+ nlons = int((self._maxLon - self._minLonCent) / self._lonRes) + 1
+ self.log.debug('nlats={0}, nlons={1}'.format(nlats, nlons))
+ self.log.debug('center lat range = {0} to {1}'.format(self._minLatCent,
+ self._maxLatCent))
+ self.log.debug('center lon range = {0} to {1}'.format(self._minLonCent,
+ self._maxLonCent))
+
+ # for tile in nexus_tiles:
+ # print 'lats: ', tile.latitudes.compressed()
+ # print 'lons: ', tile.longitudes.compressed()
+ # Create array of tuples to pass to Spark map function
+ nexus_tiles_spark = [[self._find_tile_bounds(t),
+ self._startTime, self._endTime,
+ self._ds] for t in nexus_tiles]
+ # print 'nexus_tiles_spark = ', nexus_tiles_spark
+ # Remove empty tiles (should have bounds set to None)
+ bad_tile_inds = np.where([t[0] is None for t in nexus_tiles_spark])[0]
+ for i in np.flipud(bad_tile_inds):
+ del nexus_tiles_spark[i]
+
+ # Expand Spark map tuple array by duplicating each entry N times,
+ # where N is the number of ways we want the time dimension carved up.
+ num_time_parts = 72
+ # num_time_parts = 1
+ nexus_tiles_spark = np.repeat(nexus_tiles_spark, num_time_parts, axis=0)
+ self.log.debug('repeated len(nexus_tiles_spark) = {0}'.format(len(nexus_tiles_spark)))
+
+ # Set the time boundaries for each of the Spark map tuples.
+ # Every Nth element in the array gets the same time bounds.
+ spark_part_times = np.linspace(self._startTime, self._endTime,
+ num_time_parts + 1, dtype=np.int64)
+
+ spark_part_time_ranges = \
+ np.repeat([[[spark_part_times[i],
+ spark_part_times[i + 1]] for i in range(num_time_parts)]],
+ len(nexus_tiles_spark) / num_time_parts, axis=0).reshape((len(nexus_tiles_spark), 2))
+ self.log.debug('spark_part_time_ranges={0}'.format(spark_part_time_ranges))
+ nexus_tiles_spark[:, 1:3] = spark_part_time_ranges
+ # print 'nexus_tiles_spark final = '
+ # for i in range(len(nexus_tiles_spark)):
+ # print nexus_tiles_spark[i]
+
+ # Launch Spark computations
+ rdd = self._sc.parallelize(nexus_tiles_spark, self._spark_nparts)
+ sum_count_part = rdd.map(self._map)
+ sum_count = \
+ sum_count_part.combineByKey(lambda val: val,
+ lambda x, val: (x[0] + val[0],
+ x[1] + val[1]),
+ lambda x, y: (x[0] + y[0], x[1] + y[1]))
+ fill = self._fill
+ avg_tiles = \
+ sum_count.map(lambda (bounds, (sum_tile, cnt_tile)):
+ (bounds, [[{'avg': (sum_tile[y, x] / cnt_tile[y, x])
+ if (cnt_tile[y, x] > 0)
+ else fill,
+ 'cnt': cnt_tile[y, x]}
+ for x in
+ range(sum_tile.shape[1])]
+ for y in
+ range(sum_tile.shape[0])])).collect()
+
+ # Combine subset results to produce global map.
+ #
+ # The tiles below are NOT Nexus objects. They are tuples
+ # with the time avg map data and lat-lon bounding box.
+ a = np.zeros((nlats, nlons), dtype=np.float64, order='C')
+ n = np.zeros((nlats, nlons), dtype=np.uint32, order='C')
+ for tile in avg_tiles:
+ if tile is not None:
+ ((tile_min_lat, tile_max_lat, tile_min_lon, tile_max_lon),
+ tile_stats) = tile
+ tile_data = np.ma.array(
+ [[tile_stats[y][x]['avg'] for x in range(len(tile_stats[0]))] for y in range(len(tile_stats))])
+ tile_cnt = np.array(
+ [[tile_stats[y][x]['cnt'] for x in range(len(tile_stats[0]))] for y in range(len(tile_stats))])
+ tile_data.mask = ~(tile_cnt.astype(bool))
+ y0 = self._lat2ind(tile_min_lat)
+ y1 = y0 + tile_data.shape[0] - 1
+ x0 = self._lon2ind(tile_min_lon)
+ x1 = x0 + tile_data.shape[1] - 1
+ if np.any(np.logical_not(tile_data.mask)):
+ self.log.debug(
+ 'writing tile lat {0}-{1}, lon {2}-{3}, map y {4}-{5}, map x {6}-{7}'.format(tile_min_lat,
+ tile_max_lat,
+ tile_min_lon,
+ tile_max_lon, y0,
+ y1, x0, x1))
+ a[y0:y1 + 1, x0:x1 + 1] = tile_data
+ n[y0:y1 + 1, x0:x1 + 1] = tile_cnt
+ else:
+ self.log.debug(
+ 'All pixels masked in tile lat {0}-{1}, lon {2}-{3}, map y {4}-{5}, map x {6}-{7}'.format(
+ tile_min_lat, tile_max_lat,
+ tile_min_lon, tile_max_lon,
+ y0, y1, x0, x1))
+
+ # Store global map in a NetCDF file.
+ self._create_nc_file(a, 'tam.nc', 'val', fill=self._fill)
+
+ # Create dict for JSON response
+ results = [[{'avg': a[y, x], 'cnt': int(n[y, x]),
+ 'lat': self._ind2lat(y), 'lon': self._ind2lon(x)}
+ for x in range(a.shape[1])] for y in range(a.shape[0])]
+
+ return TimeAvgMapSparkResults(results=results, meta={}, computeOptions=computeOptions)
+
+
+class TimeAvgMapSparkResults(NexusResults):
+ def __init__(self, results=None, meta=None, computeOptions=None):
+ NexusResults.__init__(self, results=results, meta=meta, stats=None, computeOptions=computeOptions)