You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by ea...@apache.org on 2020/08/05 22:27:38 UTC

[incubator-sdap-nexus] branch bug_fixes updated: revert

This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch bug_fixes
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git


The following commit(s) were added to refs/heads/bug_fixes by this push:
     new c905c37  revert
c905c37 is described below

commit c905c373c33b71d116d26465254e0012658f114d
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Wed Aug 5 15:27:27 2020 -0700

    revert
---
 analysis/tests/algorithms_spark/Matchup_test.py | 321 +++++++++++
 analysis/tests/algorithms_spark/__init__.py     |  16 +
 analysis/webservice/algorithms_spark/Matchup.py | 703 ++++++++++++++++++++++++
 3 files changed, 1040 insertions(+)

diff --git a/analysis/tests/algorithms_spark/Matchup_test.py b/analysis/tests/algorithms_spark/Matchup_test.py
new file mode 100644
index 0000000..5dee17c
--- /dev/null
+++ b/analysis/tests/algorithms_spark/Matchup_test.py
@@ -0,0 +1,321 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import pickle
+import random
+import timeit
+import unittest
+
+from webservice.algorithms_spark.Matchup import *
+
+
+class TestMatch_Points(unittest.TestCase):
+    def test_one_point_match_exact(self):
+        primary = DomsPoint(longitude=1.0, latitude=2.0, time=1000, depth=5.0, data_id=1)
+        matchup = DomsPoint(longitude=1.0, latitude=2.0, time=1000, depth=5.0, data_id=2)
+
+        primary_points = [primary]
+        matchup_points = [matchup]
+
+        matches = list(match_points_generator(primary_points, matchup_points, 0))
+
+        self.assertEquals(1, len(matches))
+
+        p_match_point, match = matches[0]
+
+        self.assertEqual(primary, p_match_point)
+        self.assertEqual(matchup, match)
+
+    def test_one_point_match_within_tolerance_150km(self):
+        primary = DomsPoint(longitude=1.0, latitude=2.0, time=1000, depth=5.0, data_id=1)
+        matchup = DomsPoint(longitude=1.0, latitude=3.0, time=1000, depth=5.0, data_id=2)
+
+        primary_points = [primary]
+        matchup_points = [matchup]
+
+        matches = list(match_points_generator(primary_points, matchup_points, 150000))  # tolerance 150 km
+
+        self.assertEquals(1, len(matches))
+
+        p_match_point, match = matches[0]
+
+        self.assertEqual(primary, p_match_point)
+        self.assertEqual(matchup, match)
+
+    def test_one_point_match_within_tolerance_200m(self):
+        primary = DomsPoint(longitude=1.0, latitude=2.0, time=1000, depth=5.0, data_id=1)
+        matchup = DomsPoint(longitude=1.001, latitude=2.0, time=1000, depth=5.0, data_id=2)
+
+        primary_points = [primary]
+        matchup_points = [matchup]
+
+        matches = list(match_points_generator(primary_points, matchup_points, 200))  # tolerance 200 m
+
+        self.assertEquals(1, len(matches))
+
+        p_match_point, match = matches[0]
+
+        self.assertEqual(primary, p_match_point)
+        self.assertEqual(matchup, match)
+
+    def test_one_point_not_match_tolerance_150km(self):
+        primary = DomsPoint(longitude=1.0, latitude=2.0, time=1000, depth=5.0, data_id=1)
+        matchup = DomsPoint(longitude=1.0, latitude=4.0, time=1000, depth=5.0, data_id=2)
+
+        primary_points = [primary]
+        matchup_points = [matchup]
+
+        matches = list(match_points_generator(primary_points, matchup_points, 150000))  # tolerance 150 km
+
+        self.assertEquals(0, len(matches))
+
+    def test_one_point_not_match_tolerance_100m(self):
+        primary = DomsPoint(longitude=1.0, latitude=2.0, time=1000, depth=5.0, data_id=1)
+        matchup = DomsPoint(longitude=1.001, latitude=2.0, time=1000, depth=5.0, data_id=2)
+
+        primary_points = [primary]
+        matchup_points = [matchup]
+
+        matches = list(match_points_generator(primary_points, matchup_points, 100))  # tolerance 100 m
+
+        self.assertEquals(0, len(matches))
+
+    def test_multiple_point_match(self):
+        primary = DomsPoint(longitude=1.0, latitude=2.0, time=1000, depth=5.0, data_id=1)
+        primary_points = [primary]
+
+        matchup_points = [
+            DomsPoint(longitude=1.0, latitude=3.0, time=1000, depth=10.0, data_id=2),
+            DomsPoint(longitude=2.0, latitude=2.0, time=1000, depth=0.0, data_id=3),
+            DomsPoint(longitude=0.5, latitude=1.5, time=1000, depth=3.0, data_id=4)
+        ]
+
+        matches = list(match_points_generator(primary_points, matchup_points, 150000))  # tolerance 150 km
+
+        self.assertEquals(3, len(matches))
+
+        self.assertSetEqual({primary}, {x[0] for x in matches})
+
+        list_of_matches = [x[1] for x in matches]
+
+        self.assertEquals(3, len(list_of_matches))
+        self.assertItemsEqual(matchup_points, list_of_matches)
+
+    def test_multiple_point_match_multiple_times(self):
+        primary_points = [
+            DomsPoint(longitude=1.0, latitude=2.0, time=1000, depth=5.0, data_id=1),
+            DomsPoint(longitude=1.5, latitude=1.5, time=1000, depth=5.0, data_id=2)
+        ]
+
+        matchup_points = [
+            DomsPoint(longitude=1.0, latitude=3.0, time=1000, depth=10.0, data_id=3),
+            DomsPoint(longitude=2.0, latitude=2.0, time=1000, depth=0.0, data_id=4),
+            DomsPoint(longitude=0.5, latitude=1.5, time=1000, depth=3.0, data_id=5)
+        ]
+
+        matches = list(match_points_generator(primary_points, matchup_points, 150000))  # tolerance 150 km
+
+        self.assertEquals(5, len(matches))
+
+        self.assertSetEqual({p for p in primary_points}, {x[0] for x in matches})
+
+        # First primary point matches all 3 secondary
+        self.assertEquals(3, [x[0] for x in matches].count(primary_points[0]))
+        self.assertItemsEqual(matchup_points, [x[1] for x in matches if x[0] == primary_points[0]])
+
+        # Second primary point matches only last 2 secondary
+        self.assertEquals(2, [x[0] for x in matches].count(primary_points[1]))
+        self.assertItemsEqual(matchup_points[1:], [x[1] for x in matches if x[0] == primary_points[1]])
+
+    def test_one_of_many_primary_matches_one_of_many_matchup(self):
+        primary_points = [
+            DomsPoint(longitude=-33.76764, latitude=30.42946, time=1351553994, data_id=1),
+            DomsPoint(longitude=-33.75731, latitude=29.86216, time=1351554004, data_id=2)
+        ]
+
+        matchup_points = [
+            DomsPoint(longitude=-33.762, latitude=28.877, time=1351521432, depth=3.973, data_id=3),
+            DomsPoint(longitude=-34.916, latitude=28.879, time=1351521770, depth=2.9798, data_id=4),
+            DomsPoint(longitude=-31.121, latitude=31.256, time=1351519892, depth=4.07, data_id=5)
+        ]
+
+        matches = list(match_points_generator(primary_points, matchup_points, 110000))  # tolerance 110 km
+
+        self.assertEquals(1, len(matches))
+
+        self.assertSetEqual({p for p in primary_points if p.data_id == 2}, {x[0] for x in matches})
+
+        # First primary point matches none
+        self.assertEquals(0, [x[0] for x in matches].count(primary_points[0]))
+
+        # Second primary point matches only first secondary
+        self.assertEquals(1, [x[0] for x in matches].count(primary_points[1]))
+        self.assertItemsEqual(matchup_points[0:1], [x[1] for x in matches if x[0] == primary_points[1]])
+
+    @unittest.skip("This test is just for timing, doesn't actually assert anything.")
+    def test_time_many_primary_many_matchup(self):
+        import logging
+        import sys
+        logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
+                            datefmt="%Y-%m-%dT%H:%M:%S", stream=sys.stdout)
+        log = logging.getLogger(__name__)
+        # Generate 160000 DomsPoints distributed equally in a box from -2.0 lat/lon to 2.0 lat/lon
+        log.info("Generating primary points")
+        x = np.arange(-2.0, 2.0, 0.01)
+        y = np.arange(-2.0, 2.0, 0.01)
+        primary_points = [DomsPoint(longitude=xy[0], latitude=xy[1], time=1000, depth=5.0, data_id=i) for i, xy in
+                          enumerate(np.array(np.meshgrid(x, y)).T.reshape(-1, 2))]
+
+        # Generate 2000 DomsPoints distributed randomly in a box from -2.0 lat/lon to 2.0 lat/lon
+        log.info("Generating matchup points")
+        matchup_points = [
+            DomsPoint(longitude=random.uniform(-2.0, 2.0), latitude=random.uniform(-2.0, 2.0), time=1000, depth=5.0,
+                      data_id=i) for i in xrange(0, 2000)]
+
+        log.info("Starting matchup")
+        log.info("Best of repeat(3, 2) matchups: %s seconds" % min(
+            timeit.repeat(lambda: list(match_points_generator(primary_points, matchup_points, 1500)), repeat=3,
+                          number=2)))
+
+
+class TestDOMSPoint(unittest.TestCase):
+    def test_is_pickleable(self):
+        edge_point = json.loads("""{
+"id": "argo-profiles-5903995(46, 0)",
+"time": "2012-10-15T14:24:04Z",
+"point": "-33.467 29.728",
+"sea_water_temperature": 24.5629997253,
+"sea_water_temperature_depth": 2.9796258642,
+"wind_speed": null,
+"sea_water_salinity": null,
+"sea_water_salinity_depth": null,
+"platform": 4,
+"device": 3,
+"fileurl": "ftp://podaac-ftp.jpl.nasa.gov/allData/insitu/L2/spurs1/argo/argo-profiles-5903995.nc"
+}""")
+        point = DomsPoint.from_edge_point(edge_point)
+        self.assertIsNotNone(pickle.dumps(point))
+
+
+def check_all():
+    return check_solr() and check_cass() and check_edge()
+
+
+def check_solr():
+    # TODO eventually this might do something.
+    return False
+
+
+def check_cass():
+    # TODO eventually this might do something.
+    return False
+
+
+def check_edge():
+    # TODO eventually this might do something.
+    return False
+
+
+@unittest.skipUnless(check_all(),
+                     "These tests require local instances of Solr, Cassandra, and Edge to be running.")
+class TestMatchup(unittest.TestCase):
+    def setUp(self):
+        from os import environ
+        environ['PYSPARK_DRIVER_PYTHON'] = '/Users/greguska/anaconda/envs/nexus-analysis/bin/python2.7'
+        environ['PYSPARK_PYTHON'] = '/Users/greguska/anaconda/envs/nexus-analysis/bin/python2.7'
+        environ['SPARK_HOME'] = '/Users/greguska/sandbox/spark-2.0.0-bin-hadoop2.7'
+
+    def test_mur_match(self):
+        from shapely.wkt import loads
+        from nexustiles.nexustiles import NexusTileService
+
+        polygon = loads("POLYGON((-34.98 29.54, -30.1 29.54, -30.1 31.00, -34.98 31.00, -34.98 29.54))")
+        primary_ds = "JPL-L4_GHRSST-SSTfnd-MUR-GLOB-v02.0-fv04.1"
+        matchup_ds = "spurs"
+        parameter = "sst"
+        start_time = 1350259200  # 2012-10-15T00:00:00Z
+        end_time = 1350345600  # 2012-10-16T00:00:00Z
+        time_tolerance = 86400
+        depth_tolerance = 5.0
+        radius_tolerance = 1500.0
+        platforms = "1,2,3,4,5,6,7,8,9"
+
+        tile_service = NexusTileService()
+        tile_ids = [tile.tile_id for tile in
+                    tile_service.find_tiles_in_polygon(polygon, primary_ds, start_time, end_time, fetch_data=False,
+                                                       fl='id')]
+        result = spark_matchup_driver(tile_ids, wkt.dumps(polygon), primary_ds, matchup_ds, parameter, time_tolerance,
+                                      depth_tolerance, radius_tolerance, platforms)
+        for k, v in result.iteritems():
+            print "primary: %s\n\tmatches:\n\t\t%s" % (
+                "lon: %s, lat: %s, time: %s, sst: %s" % (k.longitude, k.latitude, k.time, k.sst),
+                '\n\t\t'.join(
+                    ["lon: %s, lat: %s, time: %s, sst: %s" % (i.longitude, i.latitude, i.time, i.sst) for i in v]))
+
+    def test_smap_match(self):
+        from shapely.wkt import loads
+        from nexustiles.nexustiles import NexusTileService
+
+        polygon = loads("POLYGON((-34.98 29.54, -30.1 29.54, -30.1 31.00, -34.98 31.00, -34.98 29.54))")
+        primary_ds = "SMAP_L2B_SSS"
+        matchup_ds = "spurs"
+        parameter = "sss"
+        start_time = 1350259200  # 2012-10-15T00:00:00Z
+        end_time = 1350345600  # 2012-10-16T00:00:00Z
+        time_tolerance = 86400
+        depth_tolerance = 5.0
+        radius_tolerance = 1500.0
+        platforms = "1,2,3,4,5,6,7,8,9"
+
+        tile_service = NexusTileService()
+        tile_ids = [tile.tile_id for tile in
+                    tile_service.find_tiles_in_polygon(polygon, primary_ds, start_time, end_time, fetch_data=False,
+                                                       fl='id')]
+        result = spark_matchup_driver(tile_ids, wkt.dumps(polygon), primary_ds, matchup_ds, parameter, time_tolerance,
+                                      depth_tolerance, radius_tolerance, platforms)
+        for k, v in result.iteritems():
+            print "primary: %s\n\tmatches:\n\t\t%s" % (
+                "lon: %s, lat: %s, time: %s, sst: %s" % (k.longitude, k.latitude, k.time, k.sst),
+                '\n\t\t'.join(
+                    ["lon: %s, lat: %s, time: %s, sst: %s" % (i.longitude, i.latitude, i.time, i.sst) for i in v]))
+
+    def test_ascatb_match(self):
+        from shapely.wkt import loads
+        from nexustiles.nexustiles import NexusTileService
+
+        polygon = loads("POLYGON((-34.98 29.54, -30.1 29.54, -30.1 31.00, -34.98 31.00, -34.98 29.54))")
+        primary_ds = "ASCATB-L2-Coastal"
+        matchup_ds = "spurs"
+        parameter = "wind"
+        start_time = 1351468800  # 2012-10-29T00:00:00Z
+        end_time = 1351555200  # 2012-10-30T00:00:00Z
+        time_tolerance = 86400
+        depth_tolerance = 5.0
+        radius_tolerance = 110000.0  # 110 km
+        platforms = "1,2,3,4,5,6,7,8,9"
+
+        tile_service = NexusTileService()
+        tile_ids = [tile.tile_id for tile in
+                    tile_service.find_tiles_in_polygon(polygon, primary_ds, start_time, end_time, fetch_data=False,
+                                                       fl='id')]
+        result = spark_matchup_driver(tile_ids, wkt.dumps(polygon), primary_ds, matchup_ds, parameter, time_tolerance,
+                                      depth_tolerance, radius_tolerance, platforms)
+        for k, v in result.iteritems():
+            print "primary: %s\n\tmatches:\n\t\t%s" % (
+                "lon: %s, lat: %s, time: %s, wind u,v: %s,%s" % (k.longitude, k.latitude, k.time, k.wind_u, k.wind_v),
+                '\n\t\t'.join(
+                    ["lon: %s, lat: %s, time: %s, wind u,v: %s,%s" % (
+                        i.longitude, i.latitude, i.time, i.wind_u, i.wind_v) for i in v]))
diff --git a/analysis/tests/algorithms_spark/__init__.py b/analysis/tests/algorithms_spark/__init__.py
new file mode 100644
index 0000000..0707368
--- /dev/null
+++ b/analysis/tests/algorithms_spark/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
diff --git a/analysis/webservice/algorithms_spark/Matchup.py b/analysis/webservice/algorithms_spark/Matchup.py
new file mode 100644
index 0000000..9ae7557
--- /dev/null
+++ b/analysis/webservice/algorithms_spark/Matchup.py
@@ -0,0 +1,703 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+
+import json
+import logging
+import threading
+from datetime import datetime
+from itertools import chain
+from math import cos, radians
+
+import numpy as np
+import pyproj
+import requests
+from nexustiles.nexustiles import NexusTileService
+from pytz import timezone, UTC
+from scipy import spatial
+from shapely import wkt
+from shapely.geometry import Point
+from shapely.geometry import box
+from shapely.geos import WKTReadingError
+
+from webservice.NexusHandler import nexus_handler
+from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler
+from webservice.algorithms.doms import config as edge_endpoints
+from webservice.algorithms.doms import values as doms_values
+from webservice.algorithms.doms.BaseDomsHandler import DomsQueryResults
+from webservice.algorithms.doms.ResultsStorage import ResultsStorage
+from webservice.webmodel import NexusProcessingException
+
+EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
+ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
+
+
+def iso_time_to_epoch(str_time):
+    return (datetime.strptime(str_time, "%Y-%m-%dT%H:%M:%SZ").replace(
+        tzinfo=UTC) - EPOCH).total_seconds()
+
+
+@nexus_handler
+class Matchup(NexusCalcSparkHandler):
+    name = "Matchup"
+    path = "/match_spark"
+    description = "Match measurements between two or more datasets"
+
+    params = {
+        "primary": {
+            "name": "Primary Dataset",
+            "type": "string",
+            "description": "The Primary dataset used to find matches for. Required"
+        },
+        "matchup": {
+            "name": "Match-Up Datasets",
+            "type": "comma-delimited string",
+            "description": "The Dataset(s) being searched for measurements that match the Primary. Required"
+        },
+        "parameter": {
+            "name": "Match-Up Parameter",
+            "type": "string",
+            "description": "The parameter of interest used for the match up. One of 'sst', 'sss', 'wind'. Required"
+        },
+        "startTime": {
+            "name": "Start Time",
+            "type": "string",
+            "description": "Starting time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH. Required"
+        },
+        "endTime": {
+            "name": "End Time",
+            "type": "string",
+            "description": "Ending time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH. Required"
+        },
+        "b": {
+            "name": "Bounding box",
+            "type": "comma-delimited float",
+            "description": "Minimum (Western) Longitude, Minimum (Southern) Latitude, "
+                           "Maximum (Eastern) Longitude, Maximum (Northern) Latitude. Required"
+        },
+        "depthMin": {
+            "name": "Minimum Depth",
+            "type": "float",
+            "description": "Minimum depth of measurements. Must be less than depthMax. Optional. Default: no limit"
+        },
+        "depthMax": {
+            "name": "Maximum Depth",
+            "type": "float",
+            "description": "Maximum depth of measurements. Must be greater than depthMin. Optional. Default: no limit"
+        },
+        "tt": {
+            "name": "Time Tolerance",
+            "type": "long",
+            "description": "Tolerance in time (seconds) when comparing two measurements. Optional. Default: 86400"
+        },
+        "rt": {
+            "name": "Radius Tolerance",
+            "type": "float",
+            "description": "Tolerance in radius (meters) when comparing two measurements. Optional. Default: 1000"
+        },
+        "platforms": {
+            "name": "Platforms",
+            "type": "comma-delimited integer",
+            "description": "Platforms to include for matchup consideration. Required"
+        },
+        "matchOnce": {
+            "name": "Match Once",
+            "type": "boolean",
+            "description": "Optional True/False flag used to determine if more than one match per primary point is returned. "
+                           + "If true, only the nearest point will be returned for each primary point. "
+                           + "If false, all points within the tolerances will be returned for each primary point. Default: False"
+        },
+        "resultSizeLimit": {
+            "name": "Result Size Limit",
+            "type": "int",
+            "description": "Optional integer value that limits the number of results returned from the matchup. "
+                           "If the number of primary matches is greater than this limit, the service will respond with "
+                           "(HTTP 202: Accepted) and an empty response body. A value of 0 means return all results. "
+                           "Default: 500"
+        }
+    }
+    singleton = True
+
+    def __init__(self, algorithm_config=None, sc=None):
+        NexusCalcSparkHandler.__init__(self, algorithm_config=algorithm_config, sc=sc, skipCassandra=True)
+        self.log = logging.getLogger(__name__)
+
+    def parse_arguments(self, request):
+        # Parse input arguments
+        self.log.debug("Parsing arguments")
+        try:
+            bounding_polygon = request.get_bounding_polygon()
+        except:
+            raise NexusProcessingException(
+                reason="'b' argument is required. Must be comma-delimited float formatted as Minimum (Western) Longitude, Minimum (Southern) Latitude, Maximum (Eastern) Longitude, Maximum (Northern) Latitude",
+                code=400)
+        primary_ds_name = request.get_argument('primary', None)
+        if primary_ds_name is None:
+            raise NexusProcessingException(reason="'primary' argument is required", code=400)
+        matchup_ds_names = request.get_argument('matchup', None)
+        if matchup_ds_names is None:
+            raise NexusProcessingException(reason="'matchup' argument is required", code=400)
+
+        parameter_s = request.get_argument('parameter', 'sst')
+        if parameter_s not in ['sst', 'sss', 'wind']:
+            raise NexusProcessingException(
+                reason="Parameter %s not supported. Must be one of 'sst', 'sss', 'wind'." % parameter_s, code=400)
+
+        try:
+            start_time = request.get_start_datetime()
+        except:
+            raise NexusProcessingException(
+                reason="'startTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ",
+                code=400)
+        try:
+            end_time = request.get_end_datetime()
+        except:
+            raise NexusProcessingException(
+                reason="'endTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ",
+                code=400)
+
+        if start_time > end_time:
+            raise NexusProcessingException(
+                reason="The starting time must be before the ending time. Received startTime: %s, endTime: %s" % (
+                    request.get_start_datetime().strftime(ISO_8601), request.get_end_datetime().strftime(ISO_8601)),
+                code=400)
+
+        depth_min = request.get_decimal_arg('depthMin', default=None)
+        depth_max = request.get_decimal_arg('depthMax', default=None)
+
+        if depth_min is not None and depth_max is not None and depth_min >= depth_max:
+            raise NexusProcessingException(
+                reason="Depth Min should be less than Depth Max", code=400)
+
+        time_tolerance = request.get_int_arg('tt', default=86400)
+        radius_tolerance = request.get_decimal_arg('rt', default=1000.0)
+        platforms = request.get_argument('platforms', None)
+        if platforms is None:
+            raise NexusProcessingException(reason="'platforms' argument is required", code=400)
+        try:
+            p_validation = platforms.split(',')
+            p_validation = [int(p) for p in p_validation]
+            del p_validation
+        except:
+            raise NexusProcessingException(reason="platforms must be a comma-delimited list of integers", code=400)
+
+        match_once = request.get_boolean_arg("matchOnce", default=False)
+
+        result_size_limit = request.get_int_arg("resultSizeLimit", default=500)
+
+        start_seconds_from_epoch = long((start_time - EPOCH).total_seconds())
+        end_seconds_from_epoch = long((end_time - EPOCH).total_seconds())
+
+        return bounding_polygon, primary_ds_name, matchup_ds_names, parameter_s, \
+               start_time, start_seconds_from_epoch, end_time, end_seconds_from_epoch, \
+               depth_min, depth_max, time_tolerance, radius_tolerance, \
+               platforms, match_once, result_size_limit
+
+    def calc(self, request, **args):
+        start = datetime.utcnow()
+        # TODO Assuming Satellite primary
+        bounding_polygon, primary_ds_name, matchup_ds_names, parameter_s, \
+        start_time, start_seconds_from_epoch, end_time, end_seconds_from_epoch, \
+        depth_min, depth_max, time_tolerance, radius_tolerance, \
+        platforms, match_once, result_size_limit = self.parse_arguments(request)
+
+        with ResultsStorage() as resultsStorage:
+
+            execution_id = str(resultsStorage.insertExecution(None, start, None, None))
+
+        self.log.debug("Querying for tiles in search domain")
+        # Get tile ids in box
+        tile_ids = [tile.tile_id for tile in
+                    self._get_tile_service().find_tiles_in_polygon(bounding_polygon, primary_ds_name,
+                                                             start_seconds_from_epoch, end_seconds_from_epoch,
+                                                             fetch_data=False, fl='id',
+                                                             sort=['tile_min_time_dt asc', 'tile_min_lon asc',
+                                                                   'tile_min_lat asc'], rows=5000)]
+
+        # Call spark_matchup
+        self.log.debug("Calling Spark Driver")
+        try:
+            spark_result = spark_matchup_driver(tile_ids, wkt.dumps(bounding_polygon), primary_ds_name,
+                                                matchup_ds_names, parameter_s, depth_min, depth_max, time_tolerance,
+                                                radius_tolerance, platforms, match_once, sc=self._sc)
+        except Exception as e:
+            self.log.exception(e)
+            raise NexusProcessingException(reason="An unknown error occurred while computing matches", code=500)
+
+        end = datetime.utcnow()
+
+        self.log.debug("Building and saving results")
+        args = {
+            "primary": primary_ds_name,
+            "matchup": matchup_ds_names,
+            "startTime": start_time,
+            "endTime": end_time,
+            "bbox": request.get_argument('b'),
+            "timeTolerance": time_tolerance,
+            "radiusTolerance": float(radius_tolerance),
+            "platforms": platforms,
+            "parameter": parameter_s
+        }
+
+        if depth_min is not None:
+            args["depthMin"] = float(depth_min)
+
+        if depth_max is not None:
+            args["depthMax"] = float(depth_max)
+
+        total_keys = len(spark_result.keys())
+        total_values = sum(len(v) for v in spark_result.itervalues())
+        details = {
+            "timeToComplete": int((end - start).total_seconds()),
+            "numInSituRecords": 0,
+            "numInSituMatched": total_values,
+            "numGriddedChecked": 0,
+            "numGriddedMatched": total_keys
+        }
+
+        matches = Matchup.convert_to_matches(spark_result)
+
+        def do_result_insert():
+            with ResultsStorage() as storage:
+                storage.insertResults(results=matches, params=args, stats=details,
+                                      startTime=start, completeTime=end, userEmail="",
+                                      execution_id=execution_id)
+
+        threading.Thread(target=do_result_insert).start()
+
+        if 0 < result_size_limit < len(matches):
+            result = DomsQueryResults(results=None, args=args, details=details, bounds=None, count=None,
+                                      computeOptions=None, executionId=execution_id, status_code=202)
+        else:
+            result = DomsQueryResults(results=matches, args=args, details=details, bounds=None, count=None,
+                                      computeOptions=None, executionId=execution_id)
+
+        return result
+
+    @classmethod
+    def convert_to_matches(cls, spark_result):
+        matches = []
+        for primary_domspoint, matched_domspoints in spark_result.iteritems():
+            p_matched = [cls.domspoint_to_dict(p_match) for p_match in matched_domspoints]
+
+            primary = cls.domspoint_to_dict(primary_domspoint)
+            primary['matches'] = list(p_matched)
+            matches.append(primary)
+        return matches
+
+    @staticmethod
+    def domspoint_to_dict(domspoint):
+        return {
+            "sea_water_temperature": domspoint.sst,
+            "sea_water_temperature_depth": domspoint.sst_depth,
+            "sea_water_salinity": domspoint.sss,
+            "sea_water_salinity_depth": domspoint.sss_depth,
+            "wind_speed": domspoint.wind_speed,
+            "wind_direction": domspoint.wind_direction,
+            "wind_u": domspoint.wind_u,
+            "wind_v": domspoint.wind_v,
+            "platform": doms_values.getPlatformById(domspoint.platform),
+            "device": doms_values.getDeviceById(domspoint.device),
+            "x": str(domspoint.longitude),
+            "y": str(domspoint.latitude),
+            "point": "Point(%s %s)" % (domspoint.longitude, domspoint.latitude),
+            "time": datetime.strptime(domspoint.time, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=UTC),
+            "fileurl": domspoint.file_url,
+            "id": domspoint.data_id,
+            "source": domspoint.source,
+        }
+
+
+class DomsPoint(object):
+    def __init__(self, longitude=None, latitude=None, time=None, depth=None, data_id=None):
+
+        self.time = time
+        self.longitude = longitude
+        self.latitude = latitude
+        self.depth = depth
+        self.data_id = data_id
+
+        self.wind_u = None
+        self.wind_v = None
+        self.wind_direction = None
+        self.wind_speed = None
+        self.sst = None
+        self.sst_depth = None
+        self.sss = None
+        self.sss_depth = None
+        self.source = None
+        self.depth = None
+        self.platform = None
+        self.device = None
+        self.file_url = None
+
+    def __repr__(self):
+        return str(self.__dict__)
+
+    @staticmethod
+    def from_nexus_point(nexus_point, tile=None, parameter='sst'):
+        point = DomsPoint()
+
+        point.data_id = "%s[%s]" % (tile.tile_id, nexus_point.index)
+
+        # TODO Not an ideal solution; but it works for now.
+        if parameter == 'sst':
+            point.sst = nexus_point.data_val.item()
+        elif parameter == 'sss':
+            point.sss = nexus_point.data_val.item()
+        elif parameter == 'wind':
+            point.wind_u = nexus_point.data_val.item()
+            try:
+                point.wind_v = tile.meta_data['wind_v'][tuple(nexus_point.index)].item()
+            except (KeyError, IndexError):
+                pass
+            try:
+                point.wind_direction = tile.meta_data['wind_dir'][tuple(nexus_point.index)].item()
+            except (KeyError, IndexError):
+                pass
+            try:
+                point.wind_speed = tile.meta_data['wind_speed'][tuple(nexus_point.index)].item()
+            except (KeyError, IndexError):
+                pass
+        else:
+            raise NotImplementedError('%s not supported. Only sst, sss, and wind parameters are supported.' % parameter)
+
+        point.longitude = nexus_point.longitude.item()
+        point.latitude = nexus_point.latitude.item()
+
+        point.time = datetime.utcfromtimestamp(nexus_point.time).strftime('%Y-%m-%dT%H:%M:%SZ')
+
+        try:
+            point.depth = nexus_point.depth
+        except KeyError:
+            # No depth associated with this measurement
+            pass
+
+        point.sst_depth = 0
+        point.source = tile.dataset
+        point.file_url = tile.granule
+
+        # TODO device should change based on the satellite making the observations.
+        point.platform = 9
+        point.device = 5
+        return point
+
+    @staticmethod
+    def from_edge_point(edge_point):
+        point = DomsPoint()
+
+        try:
+            x, y = wkt.loads(edge_point['point']).coords[0]
+        except WKTReadingError:
+            try:
+                x, y = Point(*[float(c) for c in edge_point['point'].split(' ')]).coords[0]
+            except ValueError:
+                y, x = Point(*[float(c) for c in edge_point['point'].split(',')]).coords[0]
+
+        point.longitude = x
+        point.latitude = y
+
+        point.time = edge_point['time']
+
+        point.wind_u = edge_point.get('eastward_wind')
+        point.wind_v = edge_point.get('northward_wind')
+        point.wind_direction = edge_point.get('wind_direction')
+        point.wind_speed = edge_point.get('wind_speed')
+        point.sst = edge_point.get('sea_water_temperature')
+        point.sst_depth = edge_point.get('sea_water_temperature_depth')
+        point.sss = edge_point.get('sea_water_salinity')
+        point.sss_depth = edge_point.get('sea_water_salinity_depth')
+        point.source = edge_point.get('source')
+        point.platform = edge_point.get('platform')
+        point.device = edge_point.get('device')
+        point.file_url = edge_point.get('fileurl')
+
+        try:
+            point.data_id = unicode(edge_point['id'])
+        except KeyError:
+            point.data_id = "%s:%s:%s" % (point.time, point.longitude, point.latitude)
+
+        return point
+
+
+from threading import Lock
+
+DRIVER_LOCK = Lock()
+
+
+def spark_matchup_driver(tile_ids, bounding_wkt, primary_ds_name, matchup_ds_names, parameter, depth_min, depth_max,
+                         time_tolerance, radius_tolerance, platforms, match_once, sc=None):
+    from functools import partial
+
+    with DRIVER_LOCK:
+        # Broadcast parameters
+        primary_b = sc.broadcast(primary_ds_name)
+        matchup_b = sc.broadcast(matchup_ds_names)
+        depth_min_b = sc.broadcast(float(depth_min) if depth_min is not None else None)
+        depth_max_b = sc.broadcast(float(depth_max) if depth_max is not None else None)
+        tt_b = sc.broadcast(time_tolerance)
+        rt_b = sc.broadcast(float(radius_tolerance))
+        platforms_b = sc.broadcast(platforms)
+        bounding_wkt_b = sc.broadcast(bounding_wkt)
+        parameter_b = sc.broadcast(parameter)
+
+        # Parallelize list of tile ids
+        rdd = sc.parallelize(tile_ids, determine_parllelism(len(tile_ids)))
+
+    # Map Partitions ( list(tile_id) )
+    rdd_filtered = rdd.mapPartitions(
+        partial(match_satellite_to_insitu, primary_b=primary_b, matchup_b=matchup_b, parameter_b=parameter_b, tt_b=tt_b,
+                rt_b=rt_b, platforms_b=platforms_b, bounding_wkt_b=bounding_wkt_b, depth_min_b=depth_min_b,
+                depth_max_b=depth_max_b), preservesPartitioning=True) \
+        .filter(lambda p_m_tuple: abs(
+        iso_time_to_epoch(p_m_tuple[0].time) - iso_time_to_epoch(p_m_tuple[1].time)) <= time_tolerance)
+
+    if match_once:
+        # Only the 'nearest' point for each primary should be returned. Add an extra map/reduce which calculates
+        # the distance and finds the minimum
+
+        # Method used for calculating the distance between 2 DomsPoints
+        from pyproj import Geod
+
+        def dist(primary, matchup):
+            wgs84_geod = Geod(ellps='WGS84')
+            lat1, lon1 = (primary.latitude, primary.longitude)
+            lat2, lon2 = (matchup.latitude, matchup.longitude)
+            az12, az21, distance = wgs84_geod.inv(lon1, lat1, lon2, lat2)
+            return distance
+
+        rdd_filtered = rdd_filtered \
+            .map(lambda (primary, matchup): tuple([primary, tuple([matchup, dist(primary, matchup)])])) \
+            .reduceByKey(lambda match_1, match_2: match_1 if match_1[1] < match_2[1] else match_2) \
+            .mapValues(lambda x: [x[0]])
+    else:
+        rdd_filtered = rdd_filtered \
+            .combineByKey(lambda value: [value],  # Create 1 element list
+                          lambda value_list, value: value_list + [value],  # Add 1 element to list
+                          lambda value_list_a, value_list_b: value_list_a + value_list_b)  # Add two lists together
+
+    result_as_map = rdd_filtered.collectAsMap()
+
+    return result_as_map
+
+
+def determine_parllelism(num_tiles):
+    """
+    Try to stay at a maximum of 140 tiles per partition; But don't go over 128 partitions.
+    Also, don't go below the default of 8
+    """
+    num_partitions = max(min(num_tiles / 140, 128), 8)
+    return num_partitions
+
+
+def add_meters_to_lon_lat(lon, lat, meters):
+    """
+    Uses a simple approximation of
+    1 degree latitude = 111,111 meters
+    1 degree longitude = 111,111 meters * cosine(latitude)
+    :param lon: longitude to add meters to
+    :param lat: latitude to add meters to
+    :param meters: meters to add to the longitude and latitude values
+    :return: (longitude, latitude) increased by given meters
+    """
+    longitude = lon + ((meters / 111111) * cos(radians(lat)))
+    latitude = lat + (meters / 111111)
+
+    return longitude, latitude
+
+
+def match_satellite_to_insitu(tile_ids, primary_b, matchup_b, parameter_b, tt_b, rt_b, platforms_b,
+                              bounding_wkt_b, depth_min_b, depth_max_b):
+    the_time = datetime.now()
+    tile_ids = list(tile_ids)
+    if len(tile_ids) == 0:
+        return []
+    tile_service = NexusTileService()
+
+    # Determine the spatial temporal extents of this partition of tiles
+    tiles_bbox = tile_service.get_bounding_box(tile_ids)
+    tiles_min_time = tile_service.get_min_time(tile_ids)
+    tiles_max_time = tile_service.get_max_time(tile_ids)
+
+    # Increase spatial extents by the radius tolerance
+    matchup_min_lon, matchup_min_lat = add_meters_to_lon_lat(tiles_bbox.bounds[0], tiles_bbox.bounds[1],
+                                                             -1 * rt_b.value)
+    matchup_max_lon, matchup_max_lat = add_meters_to_lon_lat(tiles_bbox.bounds[2], tiles_bbox.bounds[3], rt_b.value)
+
+    # Don't go outside of the search domain
+    search_min_x, search_min_y, search_max_x, search_max_y = wkt.loads(bounding_wkt_b.value).bounds
+    matchup_min_lon = max(matchup_min_lon, search_min_x)
+    matchup_min_lat = max(matchup_min_lat, search_min_y)
+    matchup_max_lon = min(matchup_max_lon, search_max_x)
+    matchup_max_lat = min(matchup_max_lat, search_max_y)
+
+    # Find the centroid of the matchup bounding box and initialize the projections
+    matchup_center = box(matchup_min_lon, matchup_min_lat, matchup_max_lon, matchup_max_lat).centroid.coords[0]
+    aeqd_proj = pyproj.Proj(proj='aeqd', lon_0=matchup_center[0], lat_0=matchup_center[1])
+    lonlat_proj = pyproj.Proj(proj='lonlat')
+
+    # Increase temporal extents by the time tolerance
+    matchup_min_time = tiles_min_time - tt_b.value
+    matchup_max_time = tiles_max_time + tt_b.value
+    print "%s Time to determine spatial-temporal extents for partition %s to %s" % (
+        str(datetime.now() - the_time), tile_ids[0], tile_ids[-1])
+
+    # Query edge for all points within the spatial-temporal extents of this partition
+    the_time = datetime.now()
+    edge_session = requests.Session()
+    edge_results = []
+    with edge_session:
+        for insitudata_name in matchup_b.value.split(','):
+            bbox = ','.join(
+                [str(matchup_min_lon), str(matchup_min_lat), str(matchup_max_lon), str(matchup_max_lat)])
+            edge_response = query_edge(insitudata_name, parameter_b.value, matchup_min_time, matchup_max_time, bbox,
+                                       platforms_b.value, depth_min_b.value, depth_max_b.value, session=edge_session)
+            if edge_response['totalResults'] == 0:
+                continue
+            r = edge_response['results']
+            for p in r:
+                p['source'] = insitudata_name
+            edge_results.extend(r)
+    print "%s Time to call edge for partition %s to %s" % (str(datetime.now() - the_time), tile_ids[0], tile_ids[-1])
+    if len(edge_results) == 0:
+        return []
+
+    # Convert edge points to utm
+    the_time = datetime.now()
+    matchup_points = np.ndarray((len(edge_results), 2), dtype=np.float32)
+    for n, edge_point in enumerate(edge_results):
+        try:
+            x, y = wkt.loads(edge_point['point']).coords[0]
+        except WKTReadingError:
+            try:
+                x, y = Point(*[float(c) for c in edge_point['point'].split(' ')]).coords[0]
+            except ValueError:
+                y, x = Point(*[float(c) for c in edge_point['point'].split(',')]).coords[0]
+
+        matchup_points[n][0], matchup_points[n][1] = pyproj.transform(p1=lonlat_proj, p2=aeqd_proj, x=x, y=y)
+    print "%s Time to convert match points for partition %s to %s" % (
+        str(datetime.now() - the_time), tile_ids[0], tile_ids[-1])
+
+    # Build kdtree from matchup points
+    the_time = datetime.now()
+    m_tree = spatial.cKDTree(matchup_points, leafsize=30)
+    print "%s Time to build matchup tree" % (str(datetime.now() - the_time))
+
+    # The actual matching happens in the generator. This is so that we only load 1 tile into memory at a time
+    match_generators = [match_tile_to_point_generator(tile_service, tile_id, m_tree, edge_results, bounding_wkt_b.value,
+                                                      parameter_b.value, rt_b.value, lonlat_proj, aeqd_proj) for tile_id
+                        in tile_ids]
+
+    return chain(*match_generators)
+
+
+def match_tile_to_point_generator(tile_service, tile_id, m_tree, edge_results, search_domain_bounding_wkt,
+                                  search_parameter, radius_tolerance, lonlat_proj, aeqd_proj):
+    from nexustiles.model.nexusmodel import NexusPoint
+    from webservice.algorithms_spark.Matchup import DomsPoint  # Must import DomsPoint or Spark complains
+
+    # Load tile
+    try:
+        the_time = datetime.now()
+        tile = tile_service.mask_tiles_to_polygon(wkt.loads(search_domain_bounding_wkt),
+                                                  tile_service.find_tile_by_id(tile_id))[0]
+        print "%s Time to load tile %s" % (str(datetime.now() - the_time), tile_id)
+    except IndexError:
+        # This should only happen if all measurements in a tile become masked after applying the bounding polygon
+        raise StopIteration
+
+    # Convert valid tile lat,lon tuples to UTM tuples
+    the_time = datetime.now()
+    # Get list of indices of valid values
+    valid_indices = tile.get_indices()
+    primary_points = np.array(
+        [pyproj.transform(p1=lonlat_proj, p2=aeqd_proj, x=tile.longitudes[aslice[2]], y=tile.latitudes[aslice[1]]) for
+         aslice in valid_indices])
+
+    print "%s Time to convert primary points for tile %s" % (str(datetime.now() - the_time), tile_id)
+
+    a_time = datetime.now()
+    p_tree = spatial.cKDTree(primary_points, leafsize=30)
+    print "%s Time to build primary tree" % (str(datetime.now() - a_time))
+
+    a_time = datetime.now()
+    matched_indexes = p_tree.query_ball_tree(m_tree, radius_tolerance)
+    print "%s Time to query primary tree for tile %s" % (str(datetime.now() - a_time), tile_id)
+    for i, point_matches in enumerate(matched_indexes):
+        if len(point_matches) > 0:
+            p_nexus_point = NexusPoint(tile.latitudes[valid_indices[i][1]],
+                                       tile.longitudes[valid_indices[i][2]], None,
+                                       tile.times[valid_indices[i][0]], valid_indices[i],
+                                       tile.data[tuple(valid_indices[i])])
+            p_doms_point = DomsPoint.from_nexus_point(p_nexus_point, tile=tile, parameter=search_parameter)
+            for m_point_index in point_matches:
+                m_doms_point = DomsPoint.from_edge_point(edge_results[m_point_index])
+                yield p_doms_point, m_doms_point
+
+
+def query_edge(dataset, variable, startTime, endTime, bbox, platform, depth_min, depth_max, itemsPerPage=1000,
+               startIndex=0, stats=True, session=None):
+    try:
+        startTime = datetime.utcfromtimestamp(startTime).strftime('%Y-%m-%dT%H:%M:%SZ')
+    except TypeError:
+        # Assume we were passed a properly formatted string
+        pass
+
+    try:
+        endTime = datetime.utcfromtimestamp(endTime).strftime('%Y-%m-%dT%H:%M:%SZ')
+    except TypeError:
+        # Assume we were passed a properly formatted string
+        pass
+
+    try:
+        platform = platform.split(',')
+    except AttributeError:
+        # Assume we were passed a list
+        pass
+
+    params = {"variable": variable,
+              "startTime": startTime,
+              "endTime": endTime,
+              "bbox": bbox,
+              "minDepth": depth_min,
+              "maxDepth": depth_max,
+              "platform": platform,
+              "itemsPerPage": itemsPerPage, "startIndex": startIndex, "stats": str(stats).lower()}
+
+    if session is not None:
+        edge_request = session.get(edge_endpoints.getEndpointByName(dataset)['url'], params=params)
+    else:
+        edge_request = requests.get(edge_endpoints.getEndpointByName(dataset)['url'], params=params)
+
+    edge_request.raise_for_status()
+    edge_response = json.loads(edge_request.text)
+
+    # Get all edge results
+    next_page_url = edge_response.get('next', None)
+    while next_page_url is not None:
+        if session is not None:
+            edge_page_request = session.get(next_page_url)
+        else:
+            edge_page_request = requests.get(next_page_url)
+
+        edge_page_request.raise_for_status()
+        edge_page_response = json.loads(edge_page_request.text)
+
+        edge_response['results'].extend(edge_page_response['results'])
+
+        next_page_url = edge_page_response.get('next', None)
+
+    return edge_response