You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by le...@apache.org on 2017/10/27 22:40:23 UTC

[46/51] [partial] incubator-sdap-nexus git commit: SDAP-1 Import all code under the SDAP SGA

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms_spark/DailyDifferenceAverageSpark.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms_spark/DailyDifferenceAverageSpark.py b/analysis/webservice/algorithms_spark/DailyDifferenceAverageSpark.py
new file mode 100644
index 0000000..1f31267
--- /dev/null
+++ b/analysis/webservice/algorithms_spark/DailyDifferenceAverageSpark.py
@@ -0,0 +1,391 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology.  All rights reserved
+"""
+import logging
+from datetime import datetime
+
+import numpy as np
+import pytz
+from nexustiles.nexustiles import NexusTileService
+from shapely import wkt
+from shapely.geometry import Polygon
+
+from webservice.NexusHandler import nexus_handler, SparkHandler
+from webservice.webmodel import NexusResults, NexusProcessingException
+
+SENTINEL = 'STOP'
+
+EPOCH = pytz.timezone('UTC').localize(datetime(1970, 1, 1))
+
+
+def iso_time_to_epoch(str_time):
+    return (datetime.strptime(str_time, "%Y-%m-%dT%H:%M:%SZ").replace(
+        tzinfo=pytz.UTC) - EPOCH).total_seconds()
+
+
+@nexus_handler
+class DailyDifferenceAverageSparkImpl(SparkHandler):
+    name = "Daily Difference Average Spark"
+    path = "/dailydifferenceaverage_spark"
+    description = "Subtracts data in box in Dataset 1 from Dataset 2, then averages the difference per day."
+    params = {
+        "dataset": {
+            "name": "Dataset",
+            "type": "string",
+            "description": "The Dataset shortname to use in calculation"
+        },
+        "climatology": {
+            "name": "Climatology",
+            "type": "string",
+            "description": "The Climatology shortname to use in calculation"
+        },
+        "b": {
+            "name": "Bounding box",
+            "type": "comma-delimited float",
+            "description": "Minimum (Western) Longitude, Minimum (Southern) Latitude, Maximum (Eastern) Longitude, Maximum (Northern) Latitude"
+        },
+        "startTime": {
+            "name": "Start Time",
+            "type": "string",
+            "description": "Starting time in format YYYY-MM-DDTHH:mm:ssZ"
+        },
+        "endTime": {
+            "name": "End Time",
+            "type": "string",
+            "description": "Ending time in format YYYY-MM-DDTHH:mm:ssZ"
+        }
+    }
+    singleton = True
+
+    def __init__(self):
+        SparkHandler.__init__(self, skipCassandra=True)
+        self.log = logging.getLogger(__name__)
+
+    def parse_arguments(self, request):
+        # Parse input arguments
+        self.log.debug("Parsing arguments")
+        try:
+            bounding_polygon = request.get_bounding_polygon()
+        except:
+            try:
+                minLat = request.get_min_lat()
+                maxLat = request.get_max_lat()
+                minLon = request.get_min_lon()
+                maxLon = request.get_max_lon()
+                bounding_polygon = Polygon([(minLon, minLat),  # (west, south)
+                                            (maxLon, minLat),  # (east, south)
+                                            (maxLon, maxLat),  # (east, north)
+                                            (minLon, maxLat),  # (west, north)
+                                            (minLon, minLat)])  # (west, south)
+            except:
+                raise NexusProcessingException(
+                    reason="'b' argument or 'minLon', 'minLat', 'maxLon', and 'maxLat' arguments are required. If 'b' is used, it must be comma-delimited float formatted as Minimum (Western) Longitude, Minimum (Southern) Latitude, Maximum (Eastern) Longitude, Maximum (Northern) Latitude",
+                    code=400)
+        dataset = request.get_argument('dataset', None)
+        if dataset is None:
+            dataset = request.get_argument('ds1', None)
+        if dataset is None:
+            raise NexusProcessingException(reason="'dataset' or 'ds1' argument is required", code=400)
+        climatology = request.get_argument('climatology', None)
+        if climatology is None:
+            climatology = request.get_argument('ds2', None)
+        if climatology is None:
+            raise NexusProcessingException(reason="'climatology' or 'ds2' argument is required", code=400)
+
+        try:
+            start_time = request.get_start_datetime()
+        except:
+            raise NexusProcessingException(
+                reason="'startTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ",
+                code=400)
+        try:
+            end_time = request.get_end_datetime()
+        except:
+            raise NexusProcessingException(
+                reason="'endTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ",
+                code=400)
+
+        start_seconds_from_epoch = long((start_time - EPOCH).total_seconds())
+        end_seconds_from_epoch = long((end_time - EPOCH).total_seconds())
+
+        plot = request.get_boolean_arg("plot", default=False)
+
+        return bounding_polygon, dataset, climatology, start_time, start_seconds_from_epoch, end_time, end_seconds_from_epoch, plot
+
+    def calc(self, request, **args):
+        bounding_polygon, dataset, climatology, start_time, start_seconds_from_epoch, end_time, end_seconds_from_epoch, plot = self.parse_arguments(
+            request)
+
+        self.log.debug("Querying for tiles in search domain")
+        # Get tile ids in box
+        tile_ids = [tile.tile_id for tile in
+                    self._tile_service.find_tiles_in_polygon(bounding_polygon, dataset,
+                                                             start_seconds_from_epoch, end_seconds_from_epoch,
+                                                             fetch_data=False, fl='id',
+                                                             sort=['tile_min_time_dt asc', 'tile_min_lon asc',
+                                                                   'tile_min_lat asc'], rows=5000)]
+
+        # Call spark_matchup
+        self.log.debug("Calling Spark Driver")
+        try:
+            spark_result = spark_anomolies_driver(tile_ids, wkt.dumps(bounding_polygon), dataset, climatology,
+                                                  sc=self._sc)
+        except Exception as e:
+            self.log.exception(e)
+            raise NexusProcessingException(reason="An unknown error occurred while computing average differences",
+                                           code=500)
+
+        average_and_std_by_day = spark_result
+
+        result = DDAResult(
+            results=[[{'time': dayms, 'mean': avg_std[0], 'std': avg_std[1], 'ds': 0}] for dayms, avg_std in
+                     average_and_std_by_day],
+            stats={},
+            meta=self.get_meta(dataset))
+
+        min_lon, min_lat, max_lon, max_lat = bounding_polygon.bounds
+        result.extendMeta(min_lat, max_lat, min_lon, max_lon, "", start_time, end_time)
+        return result
+
+    def get_meta(self, dataset):
+
+        # TODO yea this is broken
+        if 'sst' in dataset.lower():
+            meta = {
+                "title": "Sea Surface Temperature (SST) Anomalies",
+                "description": "SST anomalies are departures from the 5-day pixel mean",
+                "units": u'\u00B0C',
+                "label": u'Difference from 5-Day mean (\u00B0C)'
+            }
+        elif 'chl' in dataset.lower():
+            meta = {
+                "title": "Chlorophyll Concentration Anomalies",
+                "description": "Chlorophyll Concentration anomalies are departures from the 5-day pixel mean",
+                "units": u'mg m^-3',
+                "label": u'Difference from 5-Day mean (mg m^-3)'
+            }
+        elif 'sla' in dataset.lower():
+            meta = {
+                "title": "Sea Level Anomaly Estimate (SLA) Anomalies",
+                "description": "SLA anomalies are departures from the 5-day pixel mean",
+                "units": u'm',
+                "label": u'Difference from 5-Day mean (m)'
+            }
+        elif 'sss' in dataset.lower():
+            meta = {
+                "title": "Sea Surface Salinity (SSS) Anomalies",
+                "description": "SSS anomalies are departures from the 5-day pixel mean",
+                "units": u'psu',
+                "label": u'Difference from 5-Day mean (psu)'
+            }
+        elif 'ccmp' in dataset.lower():
+            meta = {
+                "title": "Wind Speed Anomalies",
+                "description": "Wind Speed anomalies are departures from the 1-day pixel mean",
+                "units": u'm/s',
+                "label": u'Difference from 1-Day mean (m/s)'
+            }
+        elif 'trmm' in dataset.lower():
+            meta = {
+                "title": "Precipitation Anomalies",
+                "description": "Precipitation anomalies are departures from the 5-day pixel mean",
+                "units": u'mm',
+                "label": u'Difference from 5-Day mean (mm)'
+            }
+        else:
+            meta = {
+                "title": "Anomalies",
+                "description": "Anomalies are departures from the 5-day pixel mean",
+                "units": u'',
+                "label": u'Difference from 5-Day mean'
+            }
+        return meta
+
+
+class DDAResult(NexusResults):
+    def toImage(self):
+        from StringIO import StringIO
+        import matplotlib.pyplot as plt
+        from matplotlib.dates import date2num
+
+        times = [date2num(datetime.fromtimestamp(dayavglistdict[0]['time'], pytz.utc).date()) for dayavglistdict in
+                 self.results()]
+        means = [dayavglistdict[0]['mean'] for dayavglistdict in self.results()]
+        plt.plot_date(times, means, '|g-')
+
+        plt.xlabel('Date')
+        plt.xticks(rotation=70)
+        plt.ylabel(u'Difference from 5-Day mean (\u00B0C)')
+        plt.title('Sea Surface Temperature (SST) Anomalies')
+        plt.grid(True)
+        plt.tight_layout()
+
+        sio = StringIO()
+        plt.savefig(sio, format='png')
+        return sio.getvalue()
+
+
+from threading import Lock
+from shapely.geometry import box
+
+DRIVER_LOCK = Lock()
+
+
+class NoClimatologyTile(Exception):
+    pass
+
+
+class NoDatasetTile(Exception):
+    pass
+
+
+def determine_parllelism(num_tiles):
+    """
+    Try to stay at a maximum of 1500 tiles per partition; But don't go over 128 partitions.
+    Also, don't go below the default of 8
+    """
+    num_partitions = max(min(num_tiles // 1500, 128), 8)
+    return num_partitions
+
+
+def spark_anomolies_driver(tile_ids, bounding_wkt, dataset, climatology, sc=None):
+    from functools import partial
+
+    with DRIVER_LOCK:
+        bounding_wkt_b = sc.broadcast(bounding_wkt)
+        dataset_b = sc.broadcast(dataset)
+        climatology_b = sc.broadcast(climatology)
+
+        # Parallelize list of tile ids
+        rdd = sc.parallelize(tile_ids, determine_parllelism(len(tile_ids)))
+
+    def add_tuple_elements(tuple1, tuple2):
+        cumulative_sum = tuple1[0] + tuple2[0]
+        cumulative_count = tuple1[1] + tuple2[1]
+
+        avg_1 = tuple1[0] / tuple1[1]
+        avg_2 = tuple2[0] / tuple2[1]
+
+        cumulative_var = parallel_variance(avg_1, tuple1[1], tuple1[2], avg_2, tuple2[1], tuple2[2])
+        return cumulative_sum, cumulative_count, cumulative_var
+
+    def parallel_variance(avg_a, count_a, var_a, avg_b, count_b, var_b):
+        # Thanks Wikipedia https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm
+        delta = avg_b - avg_a
+        m_a = var_a * (count_a - 1)
+        m_b = var_b * (count_b - 1)
+        M2 = m_a + m_b + delta ** 2 * count_a * count_b / (count_a + count_b)
+        return M2 / (count_a + count_b - 1)
+
+    def compute_avg_and_std(sum_cnt_var_tuple):
+        return sum_cnt_var_tuple[0] / sum_cnt_var_tuple[1], np.sqrt(sum_cnt_var_tuple[2])
+
+    result = rdd \
+        .mapPartitions(partial(calculate_diff, bounding_wkt=bounding_wkt_b, dataset=dataset_b,
+                               climatology=climatology_b)) \
+        .reduceByKey(add_tuple_elements) \
+        .mapValues(compute_avg_and_std) \
+        .sortByKey() \
+        .collect()
+
+    return result
+
+
+def calculate_diff(tile_ids, bounding_wkt, dataset, climatology):
+    from itertools import chain
+
+    # Construct a list of generators that yield (day, sum, count, variance)
+    diff_generators = []
+
+    tile_ids = list(tile_ids)
+    if len(tile_ids) == 0:
+        return []
+    tile_service = NexusTileService()
+
+    for tile_id in tile_ids:
+        # Get the dataset tile
+        try:
+            dataset_tile = get_dataset_tile(tile_service, wkt.loads(bounding_wkt.value), tile_id)
+        except NoDatasetTile:
+            # This should only happen if all measurements in a tile become masked after applying the bounding polygon
+            continue
+
+        tile_day_of_year = dataset_tile.min_time.timetuple().tm_yday
+
+        # Get the climatology tile
+        try:
+            climatology_tile = get_climatology_tile(tile_service, wkt.loads(bounding_wkt.value),
+                                                    box(dataset_tile.bbox.min_lon,
+                                                        dataset_tile.bbox.min_lat,
+                                                        dataset_tile.bbox.max_lon,
+                                                        dataset_tile.bbox.max_lat),
+                                                    climatology.value,
+                                                    tile_day_of_year)
+        except NoClimatologyTile:
+            continue
+
+        diff_generators.append(generate_diff(dataset_tile, climatology_tile))
+
+    return chain(*diff_generators)
+
+
+def get_dataset_tile(tile_service, search_bounding_shape, tile_id):
+    the_time = datetime.now()
+
+    try:
+        # Load the dataset tile
+        dataset_tile = tile_service.find_tile_by_id(tile_id)[0]
+        # Mask it to the search domain
+        dataset_tile = tile_service.mask_tiles_to_polygon(search_bounding_shape, [dataset_tile])[0]
+    except IndexError:
+        raise NoDatasetTile()
+
+    print "%s Time to load dataset tile %s" % (str(datetime.now() - the_time), dataset_tile.tile_id)
+    return dataset_tile
+
+
+def get_climatology_tile(tile_service, search_bounding_shape, tile_bounding_shape, climatology_dataset,
+                         tile_day_of_year):
+    the_time = datetime.now()
+    try:
+        # Load the tile
+        climatology_tile = tile_service.find_tile_by_polygon_and_most_recent_day_of_year(
+            tile_bounding_shape,
+            climatology_dataset,
+            tile_day_of_year)[0]
+
+    except IndexError:
+        raise NoClimatologyTile()
+
+    if search_bounding_shape.contains(tile_bounding_shape):
+        # The tile is totally contained in the search area, we don't need to mask it.
+        pass
+    else:
+        # The tile is not totally contained in the search area,
+        #     we need to mask the data to the search domain.
+        try:
+            # Mask it to the search domain
+            climatology_tile = tile_service.mask_tiles_to_polygon(search_bounding_shape, [climatology_tile])[0]
+        except IndexError:
+            raise NoClimatologyTile()
+
+    print "%s Time to load climatology tile %s" % (str(datetime.now() - the_time), climatology_tile.tile_id)
+    return climatology_tile
+
+
+def generate_diff(data_tile, climatology_tile):
+    the_time = datetime.now()
+
+    diff = np.subtract(data_tile.data, climatology_tile.data)
+    diff_sum = np.nansum(diff)
+    diff_var = np.nanvar(diff)
+    diff_ct = np.ma.count(diff)
+
+    date_in_seconds = int((datetime.combine(data_tile.min_time.date(), datetime.min.time()).replace(
+        tzinfo=pytz.UTC) - EPOCH).total_seconds())
+
+    print "%s Time to generate diff between %s and %s" % (
+        str(datetime.now() - the_time), data_tile.tile_id, climatology_tile.tile_id)
+
+    yield (date_in_seconds, (diff_sum, diff_ct, diff_var))

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms_spark/HofMoellerSpark.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms_spark/HofMoellerSpark.py b/analysis/webservice/algorithms_spark/HofMoellerSpark.py
new file mode 100644
index 0000000..f6a0118
--- /dev/null
+++ b/analysis/webservice/algorithms_spark/HofMoellerSpark.py
@@ -0,0 +1,331 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology.  All rights reserved
+"""
+import itertools
+import logging
+import traceback
+from cStringIO import StringIO
+from datetime import datetime
+from multiprocessing.dummy import Pool, Manager
+
+import matplotlib.pyplot as plt
+import mpld3
+import numpy as np
+from nexustiles.nexustiles import NexusTileService
+from matplotlib import cm
+from matplotlib.ticker import FuncFormatter
+
+from webservice.NexusHandler import SparkHandler, nexus_handler, DEFAULT_PARAMETERS_SPEC
+from webservice.webmodel import NexusProcessingException, NexusResults
+
+SENTINEL = 'STOP'
+LATITUDE = 0
+LONGITUDE = 1
+
+
+class LongitudeHofMoellerCalculator(object):
+    @staticmethod
+    def longitude_time_hofmoeller_stats(tile_in_spark):
+
+        (tile_id, index, min_lat, max_lat, min_lon, max_lon) = tile_in_spark
+
+        tile_service = NexusTileService()
+        try:
+            # Load the dataset tile
+            tile = tile_service.find_tile_by_id(tile_id)[0]
+            # Mask it to the search domain
+            tile = tile_service.mask_tiles_to_bbox(min_lat, max_lat, min_lon, max_lon, [tile])[0]
+        except IndexError:
+            return None
+
+        stat = {
+            'sequence': index,
+            'time': np.ma.min(tile.times),
+            'lons': []
+        }
+        points = list(tile.nexus_point_generator())
+        data = sorted(points, key=lambda p: p.longitude)
+        points_by_lon = itertools.groupby(data, key=lambda p: p.longitude)
+
+        for lon, points_at_lon in points_by_lon:
+            values_at_lon = np.array([point.data_val for point in points_at_lon])
+            stat['lons'].append({
+                'longitude': float(lon),
+                'cnt': len(values_at_lon),
+                'avg': np.mean(values_at_lon).item(),
+                'max': np.max(values_at_lon).item(),
+                'min': np.min(values_at_lon).item(),
+                'std': np.std(values_at_lon).item()
+            })
+
+        return stat
+
+
+class LatitudeHofMoellerCalculator(object):
+    @staticmethod
+    def latitude_time_hofmoeller_stats(tile_in_spark):
+
+        (tile_id, index, min_lat, max_lat, min_lon, max_lon) = tile_in_spark
+
+        tile_service = NexusTileService()
+        try:
+            # Load the dataset tile
+            tile = tile_service.find_tile_by_id(tile_id)[0]
+            # Mask it to the search domain
+            tile = tile_service.mask_tiles_to_bbox(min_lat, max_lat, min_lon, max_lon, [tile])[0]
+        except IndexError:
+            return None
+
+        stat = {
+            'sequence': index,
+            'time': np.ma.min(tile.times),
+            'lats': []
+        }
+
+        points = list(tile.nexus_point_generator())
+        data = sorted(points, key=lambda p: p.latitude)
+        points_by_lat = itertools.groupby(data, key=lambda p: p.latitude)
+
+        for lat, points_at_lat in points_by_lat:
+            values_at_lat = np.array([point.data_val for point in points_at_lat])
+
+            stat['lats'].append({
+                'latitude': float(lat),
+                'cnt': len(values_at_lat),
+                'avg': np.mean(values_at_lat).item(),
+                'max': np.max(values_at_lat).item(),
+                'min': np.min(values_at_lat).item(),
+                'std': np.std(values_at_lat).item()
+            })
+
+        return stat
+
+
+class BaseHoffMoellerHandlerImpl(SparkHandler):
+    def __init__(self):
+        SparkHandler.__init__(self)
+        self.log = logging.getLogger(__name__)
+
+    def applyDeseasonToHofMoellerByField(self, results, pivot="lats", field="avg", append=True):
+        shape = (len(results), len(results[0][pivot]))
+        if shape[0] <= 12:
+            return results
+        for a in range(0, 12):
+            values = []
+            for b in range(a, len(results), 12):
+                values.append(np.average([l[field] for l in results[b][pivot]]))
+            avg = np.average(values)
+
+            for b in range(a, len(results), 12):
+                for l in results[b][pivot]:
+                    l["%sSeasonal" % field] = l[field] - avg
+
+        return results
+
+    def applyDeseasonToHofMoeller(self, results, pivot="lats", append=True):
+        results = self.applyDeseasonToHofMoellerByField(results, pivot, field="avg", append=append)
+        results = self.applyDeseasonToHofMoellerByField(results, pivot, field="min", append=append)
+        results = self.applyDeseasonToHofMoellerByField(results, pivot, field="max", append=append)
+        return results
+
+def determine_parllelism(num_tiles):
+    """
+    Try to stay at a maximum of 1500 tiles per partition; But don't go over 128 partitions.
+    Also, don't go below the default of 8
+    """
+    num_partitions = max(min(num_tiles // 1500, 128), 8)
+    return num_partitions
+
+@nexus_handler
+class LatitudeTimeHoffMoellerSparkHandlerImpl(BaseHoffMoellerHandlerImpl):
+    name = "Latitude/Time HofMoeller Spark"
+    path = "/latitudeTimeHofMoellerSpark"
+    description = "Computes a latitude/time HofMoeller plot given an arbitrary geographical area and time range"
+    params = DEFAULT_PARAMETERS_SPEC
+    singleton = True
+
+    def __init__(self):
+        BaseHoffMoellerHandlerImpl.__init__(self)
+
+    def calc(self, computeOptions, **args):
+        nexus_tiles_spark = [(tile.tile_id, x, computeOptions.get_min_lat(), computeOptions.get_max_lat(), computeOptions.get_min_lon(), computeOptions.get_max_lon()) for x, tile in enumerate(self._tile_service.find_tiles_in_box(computeOptions.get_min_lat(), computeOptions.get_max_lat(),
+                                                            computeOptions.get_min_lon(), computeOptions.get_max_lon(),
+                                                            computeOptions.get_dataset()[0],
+                                                            computeOptions.get_start_time(),
+                                                            computeOptions.get_end_time(), fetch_data=False))]
+
+        if len(nexus_tiles_spark) == 0:
+            raise NexusProcessingException.NoDataException(reason="No data found for selected timeframe")
+
+        # Parallelize list of tile ids
+        rdd = self._sc.parallelize(nexus_tiles_spark, determine_parllelism(len(nexus_tiles_spark)))
+        results = rdd.map(LatitudeHofMoellerCalculator.latitude_time_hofmoeller_stats).collect()
+
+        results = filter(None, results)
+        results = sorted(results, key=lambda entry: entry["time"])
+
+        results = self.applyDeseasonToHofMoeller(results)
+
+        result = HoffMoellerResults(results=results, computeOptions=computeOptions, type=HoffMoellerResults.LATITUDE)
+        return result
+
+
+@nexus_handler
+class LongitudeTimeHoffMoellerSparkHandlerImpl(BaseHoffMoellerHandlerImpl):
+    name = "Longitude/Time HofMoeller Spark"
+    path = "/longitudeTimeHofMoellerSpark"
+    description = "Computes a longitude/time HofMoeller plot given an arbitrary geographical area and time range"
+    params = DEFAULT_PARAMETERS_SPEC
+    singleton = True
+
+    def __init__(self):
+        BaseHoffMoellerHandlerImpl.__init__(self)
+
+    def calc(self, computeOptions, **args):
+        nexus_tiles_spark = [(tile.tile_id, x, computeOptions.get_min_lat(), computeOptions.get_max_lat(), computeOptions.get_min_lon(), computeOptions.get_max_lon()) for x, tile in enumerate(self._tile_service.find_tiles_in_box(computeOptions.get_min_lat(), computeOptions.get_max_lat(),
+                                                            computeOptions.get_min_lon(), computeOptions.get_max_lon(),
+                                                            computeOptions.get_dataset()[0],
+                                                            computeOptions.get_start_time(),
+                                                            computeOptions.get_end_time(), fetch_data=False))]
+
+        if len(nexus_tiles_spark) == 0:
+            raise NexusProcessingException.NoDataException(reason="No data found for selected timeframe")
+
+        # Parallelize list of tile ids
+        rdd = self._sc.parallelize(nexus_tiles_spark, determine_parllelism(len(nexus_tiles_spark)))
+        results = rdd.map(LongitudeHofMoellerCalculator.longitude_time_hofmoeller_stats).collect()
+
+        results = filter(None, results)
+        results = sorted(results, key=lambda entry: entry["time"])
+
+        results = self.applyDeseasonToHofMoeller(results, pivot="lons")
+
+        result = HoffMoellerResults(results=results, computeOptions=computeOptions, type=HoffMoellerResults.LONGITUDE)
+        return result
+
+
+class HoffMoellerResults(NexusResults):
+    LATITUDE = 0
+    LONGITUDE = 1
+
+    def __init__(self, results=None, meta=None, stats=None, computeOptions=None, **args):
+        NexusResults.__init__(self, results=results, meta=meta, stats=stats, computeOptions=computeOptions)
+        self.__type = args['type']
+
+    def createHoffmueller(self, data, coordSeries, timeSeries, coordName, title, interpolate='nearest'):
+        cmap = cm.coolwarm
+        # ls = LightSource(315, 45)
+        # rgb = ls.shade(data, cmap)
+
+        fig, ax = plt.subplots()
+        fig.set_size_inches(11.0, 8.5)
+        cax = ax.imshow(data, interpolation=interpolate, cmap=cmap)
+
+        def yFormatter(y, pos):
+            if y < len(coordSeries):
+                return "%s $^\circ$" % (int(coordSeries[int(y)] * 100.0) / 100.)
+            else:
+                return ""
+
+        def xFormatter(x, pos):
+            if x < len(timeSeries):
+                return timeSeries[int(x)].strftime('%b %Y')
+            else:
+                return ""
+
+        ax.xaxis.set_major_formatter(FuncFormatter(xFormatter))
+        ax.yaxis.set_major_formatter(FuncFormatter(yFormatter))
+
+        ax.set_title(title)
+        ax.set_ylabel(coordName)
+        ax.set_xlabel('Date')
+
+        fig.colorbar(cax)
+        fig.autofmt_xdate()
+
+        labels = ['point {0}'.format(i + 1) for i in range(len(data))]
+        # plugins.connect(fig, plugins.MousePosition(fontsize=14))
+        tooltip = mpld3.plugins.PointLabelTooltip(cax, labels=labels)
+
+        sio = StringIO()
+        plt.savefig(sio, format='png')
+        return sio.getvalue()
+
+    def createLongitudeHoffmueller(self, res, meta):
+        lonSeries = [m['longitude'] for m in res[0]['lons']]
+        timeSeries = [datetime.fromtimestamp(m['time'] / 1000) for m in res]
+
+        data = np.zeros((len(lonSeries), len(timeSeries)))
+
+        plotSeries = self.computeOptions().get_plot_series(default="avg") if self.computeOptions is not None else None
+        if plotSeries is None:
+            plotSeries = "avg"
+
+        for t in range(0, len(timeSeries)):
+            timeSet = res[t]
+            for l in range(0, len(lonSeries)):
+                latSet = timeSet['lons'][l]
+                value = latSet[plotSeries]
+                data[len(lonSeries) - l - 1][t] = value
+
+        title = meta['title']
+        source = meta['source']
+        dateRange = "%s - %s" % (timeSeries[0].strftime('%b %Y'), timeSeries[-1].strftime('%b %Y'))
+
+        return self.createHoffmueller(data, lonSeries, timeSeries, "Longitude",
+                                      "%s\n%s\n%s" % (title, source, dateRange), interpolate='nearest')
+
+    def createLatitudeHoffmueller(self, res, meta):
+        latSeries = [m['latitude'] for m in res[0]['lats']]
+        timeSeries = [datetime.fromtimestamp(m['time'] / 1000) for m in res]
+
+        data = np.zeros((len(latSeries), len(timeSeries)))
+
+        plotSeries = self.computeOptions().get_plot_series(default="avg") if self.computeOptions is not None else None
+        if plotSeries is None:
+            plotSeries = "avg"
+
+        for t in range(0, len(timeSeries)):
+            timeSet = res[t]
+            for l in range(0, len(latSeries)):
+                latSet = timeSet['lats'][l]
+                value = latSet[plotSeries]
+                data[len(latSeries) - l - 1][t] = value
+
+        title = meta['title']
+        source = meta['source']
+        dateRange = "%s - %s" % (timeSeries[0].strftime('%b %Y'), timeSeries[-1].strftime('%b %Y'))
+
+        return self.createHoffmueller(data, latSeries, timeSeries, "Latitude",
+                                      title="%s\n%s\n%s" % (title, source, dateRange), interpolate='nearest')
+
+    def toImage(self):
+        res = self.results()
+        meta = self.meta()
+
+        if self.__type == HoffMoellerResults.LATITUDE:
+            return self.createLatitudeHoffmueller(res, meta)
+        elif self.__type == HoffMoellerResults.LONGITUDE:
+            return self.createLongitudeHoffmueller(res, meta)
+        else:
+            raise Exception("Unsupported HoffMoeller Plot Type")
+
+
+def pool_worker(type, work_queue, done_queue):
+    try:
+
+        if type == LATITUDE:
+            calculator = LatitudeHofMoellerCalculator()
+        elif type == LONGITUDE:
+            calculator = LongitudeHofMoellerCalculator()
+
+        for work in iter(work_queue.get, SENTINEL):
+            scifunction = work[0]
+            args = work[1:]
+            result = calculator.__getattribute__(scifunction)(*args)
+            done_queue.put(result)
+
+    except Exception as e:
+        e_str = traceback.format_exc(e)
+        done_queue.put({'error': e_str})

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms_spark/Matchup.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms_spark/Matchup.py b/analysis/webservice/algorithms_spark/Matchup.py
new file mode 100644
index 0000000..8d90389
--- /dev/null
+++ b/analysis/webservice/algorithms_spark/Matchup.py
@@ -0,0 +1,691 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology.  All rights reserved
+"""
+
+import json
+import logging
+import threading
+import time
+from datetime import datetime
+from itertools import chain
+from math import cos, radians
+
+import numpy as np
+import pyproj
+import requests
+from nexustiles.nexustiles import NexusTileService
+from pytz import timezone, UTC
+from scipy import spatial
+from shapely import wkt
+from shapely.geometry import Point
+from shapely.geometry import box
+from shapely.geos import ReadingError
+
+from webservice.NexusHandler import SparkHandler, nexus_handler
+from webservice.algorithms.doms import config as edge_endpoints
+from webservice.algorithms.doms import values as doms_values
+from webservice.algorithms.doms.BaseDomsHandler import DomsQueryResults
+from webservice.algorithms.doms.ResultsStorage import ResultsStorage
+from webservice.webmodel import NexusProcessingException
+
+EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
+ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
+
+
+def iso_time_to_epoch(str_time):
+    return (datetime.strptime(str_time, "%Y-%m-%dT%H:%M:%SZ").replace(
+        tzinfo=UTC) - EPOCH).total_seconds()
+
+
+@nexus_handler
+class Matchup(SparkHandler):
+    name = "Matchup"
+    path = "/match_spark"
+    description = "Match measurements between two or more datasets"
+
+    params = {
+        "primary": {
+            "name": "Primary Dataset",
+            "type": "string",
+            "description": "The Primary dataset used to find matches for. Required"
+        },
+        "matchup": {
+            "name": "Match-Up Datasets",
+            "type": "comma-delimited string",
+            "description": "The Dataset(s) being searched for measurements that match the Primary. Required"
+        },
+        "parameter": {
+            "name": "Match-Up Parameter",
+            "type": "string",
+            "description": "The parameter of interest used for the match up. One of 'sst', 'sss', 'wind'. Required"
+        },
+        "startTime": {
+            "name": "Start Time",
+            "type": "string",
+            "description": "Starting time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH. Required"
+        },
+        "endTime": {
+            "name": "End Time",
+            "type": "string",
+            "description": "Ending time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH. Required"
+        },
+        "b": {
+            "name": "Bounding box",
+            "type": "comma-delimited float",
+            "description": "Minimum (Western) Longitude, Minimum (Southern) Latitude, "
+                           "Maximum (Eastern) Longitude, Maximum (Northern) Latitude. Required"
+        },
+        "depthMin": {
+            "name": "Minimum Depth",
+            "type": "float",
+            "description": "Minimum depth of measurements. Must be less than depthMax. Optional. Default: no limit"
+        },
+        "depthMax": {
+            "name": "Maximum Depth",
+            "type": "float",
+            "description": "Maximum depth of measurements. Must be greater than depthMin. Optional. Default: no limit"
+        },
+        "tt": {
+            "name": "Time Tolerance",
+            "type": "long",
+            "description": "Tolerance in time (seconds) when comparing two measurements. Optional. Default: 86400"
+        },
+        "rt": {
+            "name": "Radius Tolerance",
+            "type": "float",
+            "description": "Tolerance in radius (meters) when comparing two measurements. Optional. Default: 1000"
+        },
+        "platforms": {
+            "name": "Platforms",
+            "type": "comma-delimited integer",
+            "description": "Platforms to include for matchup consideration. Required"
+        },
+        "matchOnce": {
+            "name": "Match Once",
+            "type": "boolean",
+            "description": "Optional True/False flag used to determine if more than one match per primary point is returned. "
+                           + "If true, only the nearest point will be returned for each primary point. "
+                           + "If false, all points within the tolerances will be returned for each primary point. Default: False"
+        },
+        "resultSizeLimit": {
+            "name": "Result Size Limit",
+            "type": "int",
+            "description": "Optional integer value that limits the number of results returned from the matchup. "
+                           "If the number of primary matches is greater than this limit, the service will respond with "
+                           "(HTTP 202: Accepted) and an empty response body. A value of 0 means return all results. "
+                           "Default: 500"
+        }
+    }
+    singleton = True
+
+    def __init__(self):
+        SparkHandler.__init__(self, skipCassandra=True)
+        self.log = logging.getLogger(__name__)
+
+    def parse_arguments(self, request):
+        # Parse input arguments
+        self.log.debug("Parsing arguments")
+        try:
+            bounding_polygon = request.get_bounding_polygon()
+        except:
+            raise NexusProcessingException(
+                reason="'b' argument is required. Must be comma-delimited float formatted as Minimum (Western) Longitude, Minimum (Southern) Latitude, Maximum (Eastern) Longitude, Maximum (Northern) Latitude",
+                code=400)
+        primary_ds_name = request.get_argument('primary', None)
+        if primary_ds_name is None:
+            raise NexusProcessingException(reason="'primary' argument is required", code=400)
+        matchup_ds_names = request.get_argument('matchup', None)
+        if matchup_ds_names is None:
+            raise NexusProcessingException(reason="'matchup' argument is required", code=400)
+
+        parameter_s = request.get_argument('parameter', 'sst')
+        if parameter_s not in ['sst', 'sss', 'wind']:
+            raise NexusProcessingException(
+                reason="Parameter %s not supported. Must be one of 'sst', 'sss', 'wind'." % parameter_s, code=400)
+
+        try:
+            start_time = request.get_start_datetime()
+        except:
+            raise NexusProcessingException(
+                reason="'startTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ",
+                code=400)
+        try:
+            end_time = request.get_end_datetime()
+        except:
+            raise NexusProcessingException(
+                reason="'endTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ",
+                code=400)
+
+        if start_time > end_time:
+            raise NexusProcessingException(
+                reason="The starting time must be before the ending time. Received startTime: %s, endTime: %s" % (
+                    request.get_start_datetime().strftime(ISO_8601), request.get_end_datetime().strftime(ISO_8601)),
+                code=400)
+
+        depth_min = request.get_decimal_arg('depthMin', default=None)
+        depth_max = request.get_decimal_arg('depthMax', default=None)
+
+        if depth_min is not None and depth_max is not None and depth_min >= depth_max:
+            raise NexusProcessingException(
+                reason="Depth Min should be less than Depth Max", code=400)
+
+        time_tolerance = request.get_int_arg('tt', default=86400)
+        radius_tolerance = request.get_decimal_arg('rt', default=1000.0)
+        platforms = request.get_argument('platforms', None)
+        if platforms is None:
+            raise NexusProcessingException(reason="'platforms' argument is required", code=400)
+        try:
+            p_validation = platforms.split(',')
+            p_validation = [int(p) for p in p_validation]
+            del p_validation
+        except:
+            raise NexusProcessingException(reason="platforms must be a comma-delimited list of integers", code=400)
+
+        match_once = request.get_boolean_arg("matchOnce", default=False)
+
+        result_size_limit = request.get_int_arg("resultSizeLimit", default=500)
+
+        start_seconds_from_epoch = long((start_time - EPOCH).total_seconds())
+        end_seconds_from_epoch = long((end_time - EPOCH).total_seconds())
+
+        return bounding_polygon, primary_ds_name, matchup_ds_names, parameter_s, \
+               start_time, start_seconds_from_epoch, end_time, end_seconds_from_epoch, \
+               depth_min, depth_max, time_tolerance, radius_tolerance, \
+               platforms, match_once, result_size_limit
+
+    def calc(self, request, **args):
+        start = datetime.utcnow()
+        # TODO Assuming Satellite primary
+        bounding_polygon, primary_ds_name, matchup_ds_names, parameter_s, \
+        start_time, start_seconds_from_epoch, end_time, end_seconds_from_epoch, \
+        depth_min, depth_max, time_tolerance, radius_tolerance, \
+        platforms, match_once, result_size_limit = self.parse_arguments(request)
+
+        with ResultsStorage() as resultsStorage:
+
+            execution_id = str(resultsStorage.insertExecution(None, start, None, None))
+
+        self.log.debug("Querying for tiles in search domain")
+        # Get tile ids in box
+        tile_ids = [tile.tile_id for tile in
+                    self._tile_service.find_tiles_in_polygon(bounding_polygon, primary_ds_name,
+                                                             start_seconds_from_epoch, end_seconds_from_epoch,
+                                                             fetch_data=False, fl='id',
+                                                             sort=['tile_min_time_dt asc', 'tile_min_lon asc',
+                                                                   'tile_min_lat asc'], rows=5000)]
+
+        # Call spark_matchup
+        self.log.debug("Calling Spark Driver")
+        try:
+            spark_result = spark_matchup_driver(tile_ids, wkt.dumps(bounding_polygon), primary_ds_name,
+                                                matchup_ds_names, parameter_s, depth_min, depth_max, time_tolerance,
+                                                radius_tolerance, platforms, match_once, sc=self._sc)
+        except Exception as e:
+            self.log.exception(e)
+            raise NexusProcessingException(reason="An unknown error occurred while computing matches", code=500)
+
+        end = datetime.utcnow()
+
+        self.log.debug("Building and saving results")
+        args = {
+            "primary": primary_ds_name,
+            "matchup": matchup_ds_names,
+            "startTime": start_time,
+            "endTime": end_time,
+            "bbox": request.get_argument('b'),
+            "timeTolerance": time_tolerance,
+            "radiusTolerance": float(radius_tolerance),
+            "platforms": platforms,
+            "parameter": parameter_s
+        }
+
+        if depth_min is not None:
+            args["depthMin"] = float(depth_min)
+
+        if depth_max is not None:
+            args["depthMax"] = float(depth_max)
+
+        total_keys = len(spark_result.keys())
+        total_values = sum(len(v) for v in spark_result.itervalues())
+        details = {
+            "timeToComplete": int((end - start).total_seconds()),
+            "numInSituRecords": 0,
+            "numInSituMatched": total_values,
+            "numGriddedChecked": 0,
+            "numGriddedMatched": total_keys
+        }
+
+        matches = Matchup.convert_to_matches(spark_result)
+
+        def do_result_insert():
+            with ResultsStorage() as storage:
+                storage.insertResults(results=matches, params=args, stats=details,
+                                      startTime=start, completeTime=end, userEmail="",
+                                      execution_id=execution_id)
+
+        threading.Thread(target=do_result_insert).start()
+
+        if 0 < result_size_limit < len(matches):
+            result = DomsQueryResults(results=None, args=args, details=details, bounds=None, count=None,
+                                      computeOptions=None, executionId=execution_id, status_code=202)
+        else:
+            result = DomsQueryResults(results=matches, args=args, details=details, bounds=None, count=None,
+                                      computeOptions=None, executionId=execution_id)
+
+        return result
+
+    @classmethod
+    def convert_to_matches(cls, spark_result):
+        matches = []
+        for primary_domspoint, matched_domspoints in spark_result.iteritems():
+            p_matched = [cls.domspoint_to_dict(p_match) for p_match in matched_domspoints]
+
+            primary = cls.domspoint_to_dict(primary_domspoint)
+            primary['matches'] = list(p_matched)
+            matches.append(primary)
+        return matches
+
+    @staticmethod
+    def domspoint_to_dict(domspoint):
+        return {
+            "sea_water_temperature": domspoint.sst,
+            "sea_water_temperature_depth": domspoint.sst_depth,
+            "sea_water_salinity": domspoint.sss,
+            "sea_water_salinity_depth": domspoint.sss_depth,
+            "wind_speed": domspoint.wind_speed,
+            "wind_direction": domspoint.wind_direction,
+            "wind_u": domspoint.wind_u,
+            "wind_v": domspoint.wind_v,
+            "platform": doms_values.getPlatformById(domspoint.platform),
+            "device": doms_values.getDeviceById(domspoint.device),
+            "x": str(domspoint.longitude),
+            "y": str(domspoint.latitude),
+            "point": "Point(%s %s)" % (domspoint.longitude, domspoint.latitude),
+            "time": datetime.strptime(domspoint.time, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=UTC),
+            "fileurl": domspoint.file_url,
+            "id": domspoint.data_id,
+            "source": domspoint.source,
+        }
+
+
+class DomsPoint(object):
+    def __init__(self, longitude=None, latitude=None, time=None, depth=None, data_id=None):
+
+        self.time = time
+        self.longitude = longitude
+        self.latitude = latitude
+        self.depth = depth
+        self.data_id = data_id
+
+        self.wind_u = None
+        self.wind_v = None
+        self.wind_direction = None
+        self.wind_speed = None
+        self.sst = None
+        self.sst_depth = None
+        self.sss = None
+        self.sss_depth = None
+        self.source = None
+        self.depth = None
+        self.platform = None
+        self.device = None
+        self.file_url = None
+
+    def __repr__(self):
+        return str(self.__dict__)
+
+    @staticmethod
+    def from_nexus_point(nexus_point, tile=None, parameter='sst'):
+        point = DomsPoint()
+
+        point.data_id = "%s[%s]" % (tile.tile_id, nexus_point.index)
+
+        # TODO Not an ideal solution; but it works for now.
+        if parameter == 'sst':
+            point.sst = nexus_point.data_val.item()
+        elif parameter == 'sss':
+            point.sss = nexus_point.data_val.item()
+        elif parameter == 'wind':
+            point.wind_u = nexus_point.data_val.item()
+            try:
+                point.wind_v = tile.meta_data['wind_v'][tuple(nexus_point.index)].item()
+            except (KeyError, IndexError):
+                pass
+            try:
+                point.wind_direction = tile.meta_data['wind_dir'][tuple(nexus_point.index)].item()
+            except (KeyError, IndexError):
+                pass
+            try:
+                point.wind_speed = tile.meta_data['wind_speed'][tuple(nexus_point.index)].item()
+            except (KeyError, IndexError):
+                pass
+        else:
+            raise NotImplementedError('%s not supported. Only sst, sss, and wind parameters are supported.' % parameter)
+
+        point.longitude = nexus_point.longitude.item()
+        point.latitude = nexus_point.latitude.item()
+
+        point.time = datetime.utcfromtimestamp(nexus_point.time).strftime('%Y-%m-%dT%H:%M:%SZ')
+
+        try:
+            point.depth = nexus_point.depth
+        except KeyError:
+            # No depth associated with this measurement
+            pass
+
+        point.sst_depth = 0
+        point.source = tile.dataset
+        point.file_url = tile.granule
+
+        # TODO device should change based on the satellite making the observations.
+        point.platform = 9
+        point.device = 5
+        return point
+
+    @staticmethod
+    def from_edge_point(edge_point):
+        point = DomsPoint()
+
+        try:
+            x, y = wkt.loads(edge_point['point']).coords[0]
+        except ReadingError:
+            try:
+                x, y = Point(*[float(c) for c in edge_point['point'].split(' ')]).coords[0]
+            except ValueError:
+                y, x = Point(*[float(c) for c in edge_point['point'].split(',')]).coords[0]
+
+        point.longitude = x
+        point.latitude = y
+
+        point.time = edge_point['time']
+
+        point.wind_u = edge_point.get('eastward_wind')
+        point.wind_v = edge_point.get('northward_wind')
+        point.wind_direction = edge_point.get('wind_direction')
+        point.wind_speed = edge_point.get('wind_speed')
+        point.sst = edge_point.get('sea_water_temperature')
+        point.sst_depth = edge_point.get('sea_water_temperature_depth')
+        point.sss = edge_point.get('sea_water_salinity')
+        point.sss_depth = edge_point.get('sea_water_salinity_depth')
+        point.source = edge_point.get('source')
+        point.platform = edge_point.get('platform')
+        point.device = edge_point.get('device')
+        point.file_url = edge_point.get('fileurl')
+
+        try:
+            point.data_id = unicode(edge_point['id'])
+        except KeyError:
+            point.data_id = "%s:%s:%s" % (point.time, point.longitude, point.latitude)
+
+        return point
+
+
+from threading import Lock
+
+DRIVER_LOCK = Lock()
+
+
+def spark_matchup_driver(tile_ids, bounding_wkt, primary_ds_name, matchup_ds_names, parameter, depth_min, depth_max,
+                         time_tolerance, radius_tolerance, platforms, match_once, sc=None):
+    from functools import partial
+
+    with DRIVER_LOCK:
+        # Broadcast parameters
+        primary_b = sc.broadcast(primary_ds_name)
+        matchup_b = sc.broadcast(matchup_ds_names)
+        depth_min_b = sc.broadcast(float(depth_min) if depth_min is not None else None)
+        depth_max_b = sc.broadcast(float(depth_max) if depth_max is not None else None)
+        tt_b = sc.broadcast(time_tolerance)
+        rt_b = sc.broadcast(float(radius_tolerance))
+        platforms_b = sc.broadcast(platforms)
+        bounding_wkt_b = sc.broadcast(bounding_wkt)
+        parameter_b = sc.broadcast(parameter)
+
+        # Parallelize list of tile ids
+        rdd = sc.parallelize(tile_ids, determine_parllelism(len(tile_ids)))
+
+    # Map Partitions ( list(tile_id) )
+    rdd_filtered = rdd.mapPartitions(
+        partial(match_satellite_to_insitu, primary_b=primary_b, matchup_b=matchup_b, parameter_b=parameter_b, tt_b=tt_b,
+                rt_b=rt_b, platforms_b=platforms_b, bounding_wkt_b=bounding_wkt_b, depth_min_b=depth_min_b,
+                depth_max_b=depth_max_b), preservesPartitioning=True) \
+        .filter(lambda p_m_tuple: abs(
+        iso_time_to_epoch(p_m_tuple[0].time) - iso_time_to_epoch(p_m_tuple[1].time)) <= time_tolerance)
+
+    if match_once:
+        # Only the 'nearest' point for each primary should be returned. Add an extra map/reduce which calculates
+        # the distance and finds the minimum
+
+        # Method used for calculating the distance between 2 DomsPoints
+        from pyproj import Geod
+
+        def dist(primary, matchup):
+            wgs84_geod = Geod(ellps='WGS84')
+            lat1, lon1 = (primary.latitude, primary.longitude)
+            lat2, lon2 = (matchup.latitude, matchup.longitude)
+            az12, az21, distance = wgs84_geod.inv(lon1, lat1, lon2, lat2)
+            return distance
+
+        rdd_filtered = rdd_filtered \
+            .map(lambda (primary, matchup): tuple([primary, tuple([matchup, dist(primary, matchup)])])) \
+            .reduceByKey(lambda match_1, match_2: match_1 if match_1[1] < match_2[1] else match_2) \
+            .mapValues(lambda x: [x[0]])
+    else:
+        rdd_filtered = rdd_filtered \
+            .combineByKey(lambda value: [value],  # Create 1 element list
+                          lambda value_list, value: value_list + [value],  # Add 1 element to list
+                          lambda value_list_a, value_list_b: value_list_a + value_list_b)  # Add two lists together
+
+    result_as_map = rdd_filtered.collectAsMap()
+
+    return result_as_map
+
+
+def determine_parllelism(num_tiles):
+    """
+    Try to stay at a maximum of 140 tiles per partition; But don't go over 128 partitions.
+    Also, don't go below the default of 8
+    """
+    num_partitions = max(min(num_tiles / 140, 128), 8)
+    return num_partitions
+
+
+def add_meters_to_lon_lat(lon, lat, meters):
+    """
+    Uses a simple approximation of
+    1 degree latitude = 111,111 meters
+    1 degree longitude = 111,111 meters * cosine(latitude)
+    :param lon: longitude to add meters to
+    :param lat: latitude to add meters to
+    :param meters: meters to add to the longitude and latitude values
+    :return: (longitude, latitude) increased by given meters
+    """
+    longitude = lon + ((meters / 111111) * cos(radians(lat)))
+    latitude = lat + (meters / 111111)
+
+    return longitude, latitude
+
+
+def match_satellite_to_insitu(tile_ids, primary_b, matchup_b, parameter_b, tt_b, rt_b, platforms_b,
+                              bounding_wkt_b, depth_min_b, depth_max_b):
+    the_time = datetime.now()
+    tile_ids = list(tile_ids)
+    if len(tile_ids) == 0:
+        return []
+    tile_service = NexusTileService()
+
+    # Determine the spatial temporal extents of this partition of tiles
+    tiles_bbox = tile_service.get_bounding_box(tile_ids)
+    tiles_min_time = tile_service.get_min_time(tile_ids)
+    tiles_max_time = tile_service.get_max_time(tile_ids)
+
+    # Increase spatial extents by the radius tolerance
+    matchup_min_lon, matchup_min_lat = add_meters_to_lon_lat(tiles_bbox.bounds[0], tiles_bbox.bounds[1],
+                                                             -1 * rt_b.value)
+    matchup_max_lon, matchup_max_lat = add_meters_to_lon_lat(tiles_bbox.bounds[2], tiles_bbox.bounds[3], rt_b.value)
+
+    # Don't go outside of the search domain
+    search_min_x, search_min_y, search_max_x, search_max_y = wkt.loads(bounding_wkt_b.value).bounds
+    matchup_min_lon = max(matchup_min_lon, search_min_x)
+    matchup_min_lat = max(matchup_min_lat, search_min_y)
+    matchup_max_lon = min(matchup_max_lon, search_max_x)
+    matchup_max_lat = min(matchup_max_lat, search_max_y)
+
+    # Find the centroid of the matchup bounding box and initialize the projections
+    matchup_center = box(matchup_min_lon, matchup_min_lat, matchup_max_lon, matchup_max_lat).centroid.coords[0]
+    aeqd_proj = pyproj.Proj(proj='aeqd', lon_0=matchup_center[0], lat_0=matchup_center[1])
+    lonlat_proj = pyproj.Proj(proj='lonlat')
+
+    # Increase temporal extents by the time tolerance
+    matchup_min_time = tiles_min_time - tt_b.value
+    matchup_max_time = tiles_max_time + tt_b.value
+    print "%s Time to determine spatial-temporal extents for partition %s to %s" % (
+        str(datetime.now() - the_time), tile_ids[0], tile_ids[-1])
+
+    # Query edge for all points within the spatial-temporal extents of this partition
+    the_time = datetime.now()
+    edge_session = requests.Session()
+    edge_results = []
+    with edge_session:
+        for insitudata_name in matchup_b.value.split(','):
+            bbox = ','.join(
+                [str(matchup_min_lon), str(matchup_min_lat), str(matchup_max_lon), str(matchup_max_lat)])
+            edge_response = query_edge(insitudata_name, parameter_b.value, matchup_min_time, matchup_max_time, bbox,
+                                       platforms_b.value, depth_min_b.value, depth_max_b.value, session=edge_session)
+            if edge_response['totalResults'] == 0:
+                continue
+            r = edge_response['results']
+            for p in r:
+                p['source'] = insitudata_name
+            edge_results.extend(r)
+    print "%s Time to call edge for partition %s to %s" % (str(datetime.now() - the_time), tile_ids[0], tile_ids[-1])
+    if len(edge_results) == 0:
+        return []
+
+    # Convert edge points to utm
+    the_time = datetime.now()
+    matchup_points = np.ndarray((len(edge_results), 2), dtype=np.float32)
+    for n, edge_point in enumerate(edge_results):
+        try:
+            x, y = wkt.loads(edge_point['point']).coords[0]
+        except ReadingError:
+            try:
+                x, y = Point(*[float(c) for c in edge_point['point'].split(' ')]).coords[0]
+            except ValueError:
+                y, x = Point(*[float(c) for c in edge_point['point'].split(',')]).coords[0]
+
+        matchup_points[n][0], matchup_points[n][1] = pyproj.transform(p1=lonlat_proj, p2=aeqd_proj, x=x, y=y)
+    print "%s Time to convert match points for partition %s to %s" % (
+        str(datetime.now() - the_time), tile_ids[0], tile_ids[-1])
+
+    # Build kdtree from matchup points
+    the_time = datetime.now()
+    m_tree = spatial.cKDTree(matchup_points, leafsize=30)
+    print "%s Time to build matchup tree" % (str(datetime.now() - the_time))
+
+    # The actual matching happens in the generator. This is so that we only load 1 tile into memory at a time
+    match_generators = [match_tile_to_point_generator(tile_service, tile_id, m_tree, edge_results, bounding_wkt_b.value,
+                                                      parameter_b.value, rt_b.value, lonlat_proj, aeqd_proj) for tile_id
+                        in tile_ids]
+
+    return chain(*match_generators)
+
+
+def match_tile_to_point_generator(tile_service, tile_id, m_tree, edge_results, search_domain_bounding_wkt,
+                                  search_parameter, radius_tolerance, lonlat_proj, aeqd_proj):
+    from nexustiles.model.nexusmodel import NexusPoint
+    from webservice.algorithms_spark.Matchup import DomsPoint  # Must import DomsPoint or Spark complains
+
+    # Load tile
+    try:
+        the_time = datetime.now()
+        tile = tile_service.mask_tiles_to_polygon(wkt.loads(search_domain_bounding_wkt),
+                                                  tile_service.find_tile_by_id(tile_id))[0]
+        print "%s Time to load tile %s" % (str(datetime.now() - the_time), tile_id)
+    except IndexError:
+        # This should only happen if all measurements in a tile become masked after applying the bounding polygon
+        raise StopIteration
+
+    # Convert valid tile lat,lon tuples to UTM tuples
+    the_time = datetime.now()
+    # Get list of indices of valid values
+    valid_indices = tile.get_indices()
+    primary_points = np.array(
+        [pyproj.transform(p1=lonlat_proj, p2=aeqd_proj, x=tile.longitudes[aslice[2]], y=tile.latitudes[aslice[1]]) for
+         aslice in valid_indices])
+
+    print "%s Time to convert primary points for tile %s" % (str(datetime.now() - the_time), tile_id)
+
+    a_time = datetime.now()
+    p_tree = spatial.cKDTree(primary_points, leafsize=30)
+    print "%s Time to build primary tree" % (str(datetime.now() - a_time))
+
+    a_time = datetime.now()
+    matched_indexes = p_tree.query_ball_tree(m_tree, radius_tolerance)
+    print "%s Time to query primary tree for tile %s" % (str(datetime.now() - a_time), tile_id)
+    for i, point_matches in enumerate(matched_indexes):
+        if len(point_matches) > 0:
+            p_nexus_point = NexusPoint(tile.latitudes[valid_indices[i][1]],
+                                       tile.longitudes[valid_indices[i][2]], None,
+                                       tile.times[valid_indices[i][0]], valid_indices[i],
+                                       tile.data[tuple(valid_indices[i])])
+            p_doms_point = DomsPoint.from_nexus_point(p_nexus_point, tile=tile, parameter=search_parameter)
+            for m_point_index in point_matches:
+                m_doms_point = DomsPoint.from_edge_point(edge_results[m_point_index])
+                yield p_doms_point, m_doms_point
+
+
+def query_edge(dataset, variable, startTime, endTime, bbox, platform, depth_min, depth_max, itemsPerPage=1000,
+               startIndex=0, stats=True, session=None):
+    try:
+        startTime = datetime.utcfromtimestamp(startTime).strftime('%Y-%m-%dT%H:%M:%SZ')
+    except TypeError:
+        # Assume we were passed a properly formatted string
+        pass
+
+    try:
+        endTime = datetime.utcfromtimestamp(endTime).strftime('%Y-%m-%dT%H:%M:%SZ')
+    except TypeError:
+        # Assume we were passed a properly formatted string
+        pass
+
+    try:
+        platform = platform.split(',')
+    except AttributeError:
+        # Assume we were passed a list
+        pass
+
+    params = {"variable": variable,
+              "startTime": startTime,
+              "endTime": endTime,
+              "bbox": bbox,
+              "minDepth": depth_min,
+              "maxDepth": depth_max,
+              "platform": platform,
+              "itemsPerPage": itemsPerPage, "startIndex": startIndex, "stats": str(stats).lower()}
+
+    if session is not None:
+        edge_request = session.get(edge_endpoints.getEndpointByName(dataset)['url'], params=params)
+    else:
+        edge_request = requests.get(edge_endpoints.getEndpointByName(dataset)['url'], params=params)
+
+    edge_request.raise_for_status()
+    edge_response = json.loads(edge_request.text)
+
+    # Get all edge results
+    next_page_url = edge_response.get('next', None)
+    while next_page_url is not None:
+        if session is not None:
+            edge_page_request = session.get(next_page_url)
+        else:
+            edge_page_request = requests.get(next_page_url)
+
+        edge_page_request.raise_for_status()
+        edge_page_response = json.loads(edge_page_request.text)
+
+        edge_response['results'].extend(edge_page_response['results'])
+
+        next_page_url = edge_page_response.get('next', None)
+
+    return edge_response

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py
----------------------------------------------------------------------
diff --git a/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py b/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py
new file mode 100644
index 0000000..e332654
--- /dev/null
+++ b/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py
@@ -0,0 +1,248 @@
+"""
+Copyright (c) 2016 Jet Propulsion Laboratory,
+California Institute of Technology.  All rights reserved
+"""
+import logging
+
+import numpy as np
+from nexustiles.nexustiles import NexusTileService
+
+# from time import time
+from webservice.NexusHandler import nexus_handler, SparkHandler, DEFAULT_PARAMETERS_SPEC
+from webservice.webmodel import NexusResults, NexusProcessingException, NoDataException
+
+
+@nexus_handler
+class TimeAvgMapSparkHandlerImpl(SparkHandler):
+    name = "Time Average Map Spark"
+    path = "/timeAvgMapSpark"
+    description = "Computes a Latitude/Longitude Time Average plot given an arbitrary geographical area and time range"
+    params = DEFAULT_PARAMETERS_SPEC
+    singleton = True
+
+    def __init__(self):
+        SparkHandler.__init__(self)
+        self.log = logging.getLogger(__name__)
+        # self.log.setLevel(logging.DEBUG)
+
+    @staticmethod
+    def _map(tile_in_spark):
+        tile_bounds = tile_in_spark[0]
+        (min_lat, max_lat, min_lon, max_lon,
+         min_y, max_y, min_x, max_x) = tile_bounds
+        startTime = tile_in_spark[1]
+        endTime = tile_in_spark[2]
+        ds = tile_in_spark[3]
+        tile_service = NexusTileService()
+        # print 'Started tile {0}'.format(tile_bounds)
+        # sys.stdout.flush()
+        tile_inbounds_shape = (max_y - min_y + 1, max_x - min_x + 1)
+        # days_at_a_time = 90
+        days_at_a_time = 30
+        # days_at_a_time = 7
+        # days_at_a_time = 1
+        # print 'days_at_a_time = {0}'.format(days_at_a_time)
+        t_incr = 86400 * days_at_a_time
+        sum_tile = np.array(np.zeros(tile_inbounds_shape, dtype=np.float64))
+        cnt_tile = np.array(np.zeros(tile_inbounds_shape, dtype=np.uint32))
+        t_start = startTime
+        while t_start <= endTime:
+            t_end = min(t_start + t_incr, endTime)
+            # t1 = time()
+            # print 'nexus call start at time {0}'.format(t1)
+            # sys.stdout.flush()
+            # nexus_tiles = \
+            #    TimeAvgMapSparkHandlerImpl.query_by_parts(tile_service,
+            #                                              min_lat, max_lat, 
+            #                                              min_lon, max_lon, 
+            #                                              ds, 
+            #                                              t_start, 
+            #                                              t_end,
+            #                                              part_dim=2)
+            nexus_tiles = \
+                tile_service.get_tiles_bounded_by_box(min_lat, max_lat,
+                                                      min_lon, max_lon,
+                                                      ds=ds,
+                                                      start_time=t_start,
+                                                      end_time=t_end)
+            # t2 = time()
+            # print 'nexus call end at time %f' % t2
+            # print 'secs in nexus call: ', t2 - t1
+            # print 't %d to %d - Got %d tiles' % (t_start, t_end,
+            #                                     len(nexus_tiles))
+            # for nt in nexus_tiles:
+            #    print nt.granule
+            #    print nt.section_spec
+            #    print 'lat min/max:', np.ma.min(nt.latitudes), np.ma.max(nt.latitudes)
+            #    print 'lon min/max:', np.ma.min(nt.longitudes), np.ma.max(nt.longitudes)
+            # sys.stdout.flush()
+
+            for tile in nexus_tiles:
+                tile.data.data[:, :] = np.nan_to_num(tile.data.data)
+                sum_tile += tile.data.data[0, min_y:max_y + 1, min_x:max_x + 1]
+                cnt_tile += (~tile.data.mask[0,
+                              min_y:max_y + 1,
+                              min_x:max_x + 1]).astype(np.uint8)
+            t_start = t_end + 1
+
+        # print 'cnt_tile = ', cnt_tile
+        # cnt_tile.mask = ~(cnt_tile.data.astype(bool))
+        # sum_tile.mask = cnt_tile.mask
+        # avg_tile = sum_tile / cnt_tile
+        # stats_tile = [[{'avg': avg_tile.data[y,x], 'cnt': cnt_tile.data[y,x]} for x in range(tile_inbounds_shape[1])] for y in range(tile_inbounds_shape[0])]
+        # print 'Finished tile', tile_bounds
+        # print 'Tile avg = ', avg_tile
+        # sys.stdout.flush()
+        return ((min_lat, max_lat, min_lon, max_lon), (sum_tile, cnt_tile))
+
+    def calc(self, computeOptions, **args):
+        """
+
+        :param computeOptions: StatsComputeOptions
+        :param args: dict
+        :return:
+        """
+
+        spark_master, spark_nexecs, spark_nparts = computeOptions.get_spark_cfg()
+        self._setQueryParams(computeOptions.get_dataset()[0],
+                             (float(computeOptions.get_min_lat()),
+                              float(computeOptions.get_max_lat()),
+                              float(computeOptions.get_min_lon()),
+                              float(computeOptions.get_max_lon())),
+                             computeOptions.get_start_time(),
+                             computeOptions.get_end_time(),
+                             spark_master=spark_master,
+                             spark_nexecs=spark_nexecs,
+                             spark_nparts=spark_nparts)
+
+        if 'CLIM' in self._ds:
+            raise NexusProcessingException(
+                reason="Cannot compute Latitude/Longitude Time Average plot on a climatology", code=400)
+
+        nexus_tiles = self._find_global_tile_set()
+        # print 'tiles:'
+        # for tile in nexus_tiles:
+        #     print tile.granule
+        #     print tile.section_spec
+        #     print 'lat:', tile.latitudes
+        #     print 'lon:', tile.longitudes
+
+        #                                                          nexus_tiles)
+        if len(nexus_tiles) == 0:
+            raise NoDataException(reason="No data found for selected timeframe")
+
+        self.log.debug('Found {0} tiles'.format(len(nexus_tiles)))
+
+        self.log.debug('Using Native resolution: lat_res={0}, lon_res={1}'.format(self._latRes, self._lonRes))
+        nlats = int((self._maxLat - self._minLatCent) / self._latRes) + 1
+        nlons = int((self._maxLon - self._minLonCent) / self._lonRes) + 1
+        self.log.debug('nlats={0}, nlons={1}'.format(nlats, nlons))
+        self.log.debug('center lat range = {0} to {1}'.format(self._minLatCent,
+                                                              self._maxLatCent))
+        self.log.debug('center lon range = {0} to {1}'.format(self._minLonCent,
+                                                              self._maxLonCent))
+
+        # for tile in nexus_tiles:
+        #    print 'lats: ', tile.latitudes.compressed()
+        #    print 'lons: ', tile.longitudes.compressed()
+        # Create array of tuples to pass to Spark map function
+        nexus_tiles_spark = [[self._find_tile_bounds(t),
+                              self._startTime, self._endTime,
+                              self._ds] for t in nexus_tiles]
+        # print 'nexus_tiles_spark = ', nexus_tiles_spark
+        # Remove empty tiles (should have bounds set to None)
+        bad_tile_inds = np.where([t[0] is None for t in nexus_tiles_spark])[0]
+        for i in np.flipud(bad_tile_inds):
+            del nexus_tiles_spark[i]
+
+        # Expand Spark map tuple array by duplicating each entry N times,
+        # where N is the number of ways we want the time dimension carved up.
+        num_time_parts = 72
+        # num_time_parts = 1
+        nexus_tiles_spark = np.repeat(nexus_tiles_spark, num_time_parts, axis=0)
+        self.log.debug('repeated len(nexus_tiles_spark) = {0}'.format(len(nexus_tiles_spark)))
+
+        # Set the time boundaries for each of the Spark map tuples.
+        # Every Nth element in the array gets the same time bounds.
+        spark_part_times = np.linspace(self._startTime, self._endTime,
+                                       num_time_parts + 1, dtype=np.int64)
+
+        spark_part_time_ranges = \
+            np.repeat([[[spark_part_times[i],
+                         spark_part_times[i + 1]] for i in range(num_time_parts)]],
+                      len(nexus_tiles_spark) / num_time_parts, axis=0).reshape((len(nexus_tiles_spark), 2))
+        self.log.debug('spark_part_time_ranges={0}'.format(spark_part_time_ranges))
+        nexus_tiles_spark[:, 1:3] = spark_part_time_ranges
+        # print 'nexus_tiles_spark final = '
+        # for i in range(len(nexus_tiles_spark)):
+        #    print nexus_tiles_spark[i]
+
+        # Launch Spark computations
+        rdd = self._sc.parallelize(nexus_tiles_spark, self._spark_nparts)
+        sum_count_part = rdd.map(self._map)
+        sum_count = \
+            sum_count_part.combineByKey(lambda val: val,
+                                        lambda x, val: (x[0] + val[0],
+                                                        x[1] + val[1]),
+                                        lambda x, y: (x[0] + y[0], x[1] + y[1]))
+        fill = self._fill
+        avg_tiles = \
+            sum_count.map(lambda (bounds, (sum_tile, cnt_tile)):
+                          (bounds, [[{'avg': (sum_tile[y, x] / cnt_tile[y, x])
+                          if (cnt_tile[y, x] > 0)
+                          else fill,
+                                      'cnt': cnt_tile[y, x]}
+                                     for x in
+                                     range(sum_tile.shape[1])]
+                                    for y in
+                                    range(sum_tile.shape[0])])).collect()
+
+        # Combine subset results to produce global map.
+        #
+        # The tiles below are NOT Nexus objects.  They are tuples
+        # with the time avg map data and lat-lon bounding box.
+        a = np.zeros((nlats, nlons), dtype=np.float64, order='C')
+        n = np.zeros((nlats, nlons), dtype=np.uint32, order='C')
+        for tile in avg_tiles:
+            if tile is not None:
+                ((tile_min_lat, tile_max_lat, tile_min_lon, tile_max_lon),
+                 tile_stats) = tile
+                tile_data = np.ma.array(
+                    [[tile_stats[y][x]['avg'] for x in range(len(tile_stats[0]))] for y in range(len(tile_stats))])
+                tile_cnt = np.array(
+                    [[tile_stats[y][x]['cnt'] for x in range(len(tile_stats[0]))] for y in range(len(tile_stats))])
+                tile_data.mask = ~(tile_cnt.astype(bool))
+                y0 = self._lat2ind(tile_min_lat)
+                y1 = y0 + tile_data.shape[0] - 1
+                x0 = self._lon2ind(tile_min_lon)
+                x1 = x0 + tile_data.shape[1] - 1
+                if np.any(np.logical_not(tile_data.mask)):
+                    self.log.debug(
+                        'writing tile lat {0}-{1}, lon {2}-{3}, map y {4}-{5}, map x {6}-{7}'.format(tile_min_lat,
+                                                                                                     tile_max_lat,
+                                                                                                     tile_min_lon,
+                                                                                                     tile_max_lon, y0,
+                                                                                                     y1, x0, x1))
+                    a[y0:y1 + 1, x0:x1 + 1] = tile_data
+                    n[y0:y1 + 1, x0:x1 + 1] = tile_cnt
+                else:
+                    self.log.debug(
+                        'All pixels masked in tile lat {0}-{1}, lon {2}-{3}, map y {4}-{5}, map x {6}-{7}'.format(
+                            tile_min_lat, tile_max_lat,
+                            tile_min_lon, tile_max_lon,
+                            y0, y1, x0, x1))
+
+        # Store global map in a NetCDF file.
+        self._create_nc_file(a, 'tam.nc', 'val', fill=self._fill)
+
+        # Create dict for JSON response
+        results = [[{'avg': a[y, x], 'cnt': int(n[y, x]),
+                     'lat': self._ind2lat(y), 'lon': self._ind2lon(x)}
+                    for x in range(a.shape[1])] for y in range(a.shape[0])]
+
+        return TimeAvgMapSparkResults(results=results, meta={}, computeOptions=computeOptions)
+
+
+class TimeAvgMapSparkResults(NexusResults):
+    def __init__(self, results=None, meta=None, computeOptions=None):
+        NexusResults.__init__(self, results=results, meta=meta, stats=None, computeOptions=computeOptions)