You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by tl...@apache.org on 2020/08/11 01:59:49 UTC
[incubator-sdap-nexus] 26/28: revert
This is an automated email from the ASF dual-hosted git repository.
tloubrieu pushed a commit to branch SDAP-268
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git
commit c905c373c33b71d116d26465254e0012658f114d
Author: Eamon Ford <ea...@gmail.com>
AuthorDate: Wed Aug 5 15:27:27 2020 -0700
revert
---
analysis/tests/algorithms_spark/Matchup_test.py | 321 +++++++++++
analysis/tests/algorithms_spark/__init__.py | 16 +
analysis/webservice/algorithms_spark/Matchup.py | 703 ++++++++++++++++++++++++
3 files changed, 1040 insertions(+)
diff --git a/analysis/tests/algorithms_spark/Matchup_test.py b/analysis/tests/algorithms_spark/Matchup_test.py
new file mode 100644
index 0000000..5dee17c
--- /dev/null
+++ b/analysis/tests/algorithms_spark/Matchup_test.py
@@ -0,0 +1,321 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import pickle
+import random
+import timeit
+import unittest
+
+from webservice.algorithms_spark.Matchup import *
+
+
+class TestMatch_Points(unittest.TestCase):
+ def test_one_point_match_exact(self):
+ primary = DomsPoint(longitude=1.0, latitude=2.0, time=1000, depth=5.0, data_id=1)
+ matchup = DomsPoint(longitude=1.0, latitude=2.0, time=1000, depth=5.0, data_id=2)
+
+ primary_points = [primary]
+ matchup_points = [matchup]
+
+ matches = list(match_points_generator(primary_points, matchup_points, 0))
+
+ self.assertEquals(1, len(matches))
+
+ p_match_point, match = matches[0]
+
+ self.assertEqual(primary, p_match_point)
+ self.assertEqual(matchup, match)
+
+ def test_one_point_match_within_tolerance_150km(self):
+ primary = DomsPoint(longitude=1.0, latitude=2.0, time=1000, depth=5.0, data_id=1)
+ matchup = DomsPoint(longitude=1.0, latitude=3.0, time=1000, depth=5.0, data_id=2)
+
+ primary_points = [primary]
+ matchup_points = [matchup]
+
+ matches = list(match_points_generator(primary_points, matchup_points, 150000)) # tolerance 150 km
+
+ self.assertEquals(1, len(matches))
+
+ p_match_point, match = matches[0]
+
+ self.assertEqual(primary, p_match_point)
+ self.assertEqual(matchup, match)
+
+ def test_one_point_match_within_tolerance_200m(self):
+ primary = DomsPoint(longitude=1.0, latitude=2.0, time=1000, depth=5.0, data_id=1)
+ matchup = DomsPoint(longitude=1.001, latitude=2.0, time=1000, depth=5.0, data_id=2)
+
+ primary_points = [primary]
+ matchup_points = [matchup]
+
+ matches = list(match_points_generator(primary_points, matchup_points, 200)) # tolerance 200 m
+
+ self.assertEquals(1, len(matches))
+
+ p_match_point, match = matches[0]
+
+ self.assertEqual(primary, p_match_point)
+ self.assertEqual(matchup, match)
+
+ def test_one_point_not_match_tolerance_150km(self):
+ primary = DomsPoint(longitude=1.0, latitude=2.0, time=1000, depth=5.0, data_id=1)
+ matchup = DomsPoint(longitude=1.0, latitude=4.0, time=1000, depth=5.0, data_id=2)
+
+ primary_points = [primary]
+ matchup_points = [matchup]
+
+ matches = list(match_points_generator(primary_points, matchup_points, 150000)) # tolerance 150 km
+
+ self.assertEquals(0, len(matches))
+
+ def test_one_point_not_match_tolerance_100m(self):
+ primary = DomsPoint(longitude=1.0, latitude=2.0, time=1000, depth=5.0, data_id=1)
+ matchup = DomsPoint(longitude=1.001, latitude=2.0, time=1000, depth=5.0, data_id=2)
+
+ primary_points = [primary]
+ matchup_points = [matchup]
+
+ matches = list(match_points_generator(primary_points, matchup_points, 100)) # tolerance 100 m
+
+ self.assertEquals(0, len(matches))
+
+ def test_multiple_point_match(self):
+ primary = DomsPoint(longitude=1.0, latitude=2.0, time=1000, depth=5.0, data_id=1)
+ primary_points = [primary]
+
+ matchup_points = [
+ DomsPoint(longitude=1.0, latitude=3.0, time=1000, depth=10.0, data_id=2),
+ DomsPoint(longitude=2.0, latitude=2.0, time=1000, depth=0.0, data_id=3),
+ DomsPoint(longitude=0.5, latitude=1.5, time=1000, depth=3.0, data_id=4)
+ ]
+
+ matches = list(match_points_generator(primary_points, matchup_points, 150000)) # tolerance 150 km
+
+ self.assertEquals(3, len(matches))
+
+ self.assertSetEqual({primary}, {x[0] for x in matches})
+
+ list_of_matches = [x[1] for x in matches]
+
+ self.assertEquals(3, len(list_of_matches))
+ self.assertItemsEqual(matchup_points, list_of_matches)
+
+ def test_multiple_point_match_multiple_times(self):
+ primary_points = [
+ DomsPoint(longitude=1.0, latitude=2.0, time=1000, depth=5.0, data_id=1),
+ DomsPoint(longitude=1.5, latitude=1.5, time=1000, depth=5.0, data_id=2)
+ ]
+
+ matchup_points = [
+ DomsPoint(longitude=1.0, latitude=3.0, time=1000, depth=10.0, data_id=3),
+ DomsPoint(longitude=2.0, latitude=2.0, time=1000, depth=0.0, data_id=4),
+ DomsPoint(longitude=0.5, latitude=1.5, time=1000, depth=3.0, data_id=5)
+ ]
+
+ matches = list(match_points_generator(primary_points, matchup_points, 150000)) # tolerance 150 km
+
+ self.assertEquals(5, len(matches))
+
+ self.assertSetEqual({p for p in primary_points}, {x[0] for x in matches})
+
+ # First primary point matches all 3 secondary
+ self.assertEquals(3, [x[0] for x in matches].count(primary_points[0]))
+ self.assertItemsEqual(matchup_points, [x[1] for x in matches if x[0] == primary_points[0]])
+
+ # Second primary point matches only last 2 secondary
+ self.assertEquals(2, [x[0] for x in matches].count(primary_points[1]))
+ self.assertItemsEqual(matchup_points[1:], [x[1] for x in matches if x[0] == primary_points[1]])
+
+ def test_one_of_many_primary_matches_one_of_many_matchup(self):
+ primary_points = [
+ DomsPoint(longitude=-33.76764, latitude=30.42946, time=1351553994, data_id=1),
+ DomsPoint(longitude=-33.75731, latitude=29.86216, time=1351554004, data_id=2)
+ ]
+
+ matchup_points = [
+ DomsPoint(longitude=-33.762, latitude=28.877, time=1351521432, depth=3.973, data_id=3),
+ DomsPoint(longitude=-34.916, latitude=28.879, time=1351521770, depth=2.9798, data_id=4),
+ DomsPoint(longitude=-31.121, latitude=31.256, time=1351519892, depth=4.07, data_id=5)
+ ]
+
+ matches = list(match_points_generator(primary_points, matchup_points, 110000)) # tolerance 110 km
+
+ self.assertEquals(1, len(matches))
+
+ self.assertSetEqual({p for p in primary_points if p.data_id == 2}, {x[0] for x in matches})
+
+ # First primary point matches none
+ self.assertEquals(0, [x[0] for x in matches].count(primary_points[0]))
+
+ # Second primary point matches only first secondary
+ self.assertEquals(1, [x[0] for x in matches].count(primary_points[1]))
+ self.assertItemsEqual(matchup_points[0:1], [x[1] for x in matches if x[0] == primary_points[1]])
+
+ @unittest.skip("This test is just for timing, doesn't actually assert anything.")
+ def test_time_many_primary_many_matchup(self):
+ import logging
+ import sys
+ logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
+ datefmt="%Y-%m-%dT%H:%M:%S", stream=sys.stdout)
+ log = logging.getLogger(__name__)
+ # Generate 160000 DomsPoints distributed equally in a box from -2.0 lat/lon to 2.0 lat/lon
+ log.info("Generating primary points")
+ x = np.arange(-2.0, 2.0, 0.01)
+ y = np.arange(-2.0, 2.0, 0.01)
+ primary_points = [DomsPoint(longitude=xy[0], latitude=xy[1], time=1000, depth=5.0, data_id=i) for i, xy in
+ enumerate(np.array(np.meshgrid(x, y)).T.reshape(-1, 2))]
+
+ # Generate 2000 DomsPoints distributed randomly in a box from -2.0 lat/lon to 2.0 lat/lon
+ log.info("Generating matchup points")
+ matchup_points = [
+ DomsPoint(longitude=random.uniform(-2.0, 2.0), latitude=random.uniform(-2.0, 2.0), time=1000, depth=5.0,
+ data_id=i) for i in xrange(0, 2000)]
+
+ log.info("Starting matchup")
+ log.info("Best of repeat(3, 2) matchups: %s seconds" % min(
+ timeit.repeat(lambda: list(match_points_generator(primary_points, matchup_points, 1500)), repeat=3,
+ number=2)))
+
+
+class TestDOMSPoint(unittest.TestCase):
+ def test_is_pickleable(self):
+ edge_point = json.loads("""{
+"id": "argo-profiles-5903995(46, 0)",
+"time": "2012-10-15T14:24:04Z",
+"point": "-33.467 29.728",
+"sea_water_temperature": 24.5629997253,
+"sea_water_temperature_depth": 2.9796258642,
+"wind_speed": null,
+"sea_water_salinity": null,
+"sea_water_salinity_depth": null,
+"platform": 4,
+"device": 3,
+"fileurl": "ftp://podaac-ftp.jpl.nasa.gov/allData/insitu/L2/spurs1/argo/argo-profiles-5903995.nc"
+}""")
+ point = DomsPoint.from_edge_point(edge_point)
+ self.assertIsNotNone(pickle.dumps(point))
+
+
+def check_all():
+ return check_solr() and check_cass() and check_edge()
+
+
+def check_solr():
+ # TODO eventually this might do something.
+ return False
+
+
+def check_cass():
+ # TODO eventually this might do something.
+ return False
+
+
+def check_edge():
+ # TODO eventually this might do something.
+ return False
+
+
+@unittest.skipUnless(check_all(),
+ "These tests require local instances of Solr, Cassandra, and Edge to be running.")
+class TestMatchup(unittest.TestCase):
+ def setUp(self):
+ from os import environ
+ environ['PYSPARK_DRIVER_PYTHON'] = '/Users/greguska/anaconda/envs/nexus-analysis/bin/python2.7'
+ environ['PYSPARK_PYTHON'] = '/Users/greguska/anaconda/envs/nexus-analysis/bin/python2.7'
+ environ['SPARK_HOME'] = '/Users/greguska/sandbox/spark-2.0.0-bin-hadoop2.7'
+
+ def test_mur_match(self):
+ from shapely.wkt import loads
+ from nexustiles.nexustiles import NexusTileService
+
+ polygon = loads("POLYGON((-34.98 29.54, -30.1 29.54, -30.1 31.00, -34.98 31.00, -34.98 29.54))")
+ primary_ds = "JPL-L4_GHRSST-SSTfnd-MUR-GLOB-v02.0-fv04.1"
+ matchup_ds = "spurs"
+ parameter = "sst"
+ start_time = 1350259200 # 2012-10-15T00:00:00Z
+ end_time = 1350345600 # 2012-10-16T00:00:00Z
+ time_tolerance = 86400
+ depth_tolerance = 5.0
+ radius_tolerance = 1500.0
+ platforms = "1,2,3,4,5,6,7,8,9"
+
+ tile_service = NexusTileService()
+ tile_ids = [tile.tile_id for tile in
+ tile_service.find_tiles_in_polygon(polygon, primary_ds, start_time, end_time, fetch_data=False,
+ fl='id')]
+ result = spark_matchup_driver(tile_ids, wkt.dumps(polygon), primary_ds, matchup_ds, parameter, time_tolerance,
+ depth_tolerance, radius_tolerance, platforms)
+ for k, v in result.iteritems():
+ print "primary: %s\n\tmatches:\n\t\t%s" % (
+ "lon: %s, lat: %s, time: %s, sst: %s" % (k.longitude, k.latitude, k.time, k.sst),
+ '\n\t\t'.join(
+ ["lon: %s, lat: %s, time: %s, sst: %s" % (i.longitude, i.latitude, i.time, i.sst) for i in v]))
+
+ def test_smap_match(self):
+ from shapely.wkt import loads
+ from nexustiles.nexustiles import NexusTileService
+
+ polygon = loads("POLYGON((-34.98 29.54, -30.1 29.54, -30.1 31.00, -34.98 31.00, -34.98 29.54))")
+ primary_ds = "SMAP_L2B_SSS"
+ matchup_ds = "spurs"
+ parameter = "sss"
+ start_time = 1350259200 # 2012-10-15T00:00:00Z
+ end_time = 1350345600 # 2012-10-16T00:00:00Z
+ time_tolerance = 86400
+ depth_tolerance = 5.0
+ radius_tolerance = 1500.0
+ platforms = "1,2,3,4,5,6,7,8,9"
+
+ tile_service = NexusTileService()
+ tile_ids = [tile.tile_id for tile in
+ tile_service.find_tiles_in_polygon(polygon, primary_ds, start_time, end_time, fetch_data=False,
+ fl='id')]
+ result = spark_matchup_driver(tile_ids, wkt.dumps(polygon), primary_ds, matchup_ds, parameter, time_tolerance,
+ depth_tolerance, radius_tolerance, platforms)
+ for k, v in result.iteritems():
+ print "primary: %s\n\tmatches:\n\t\t%s" % (
+ "lon: %s, lat: %s, time: %s, sst: %s" % (k.longitude, k.latitude, k.time, k.sst),
+ '\n\t\t'.join(
+ ["lon: %s, lat: %s, time: %s, sst: %s" % (i.longitude, i.latitude, i.time, i.sst) for i in v]))
+
+ def test_ascatb_match(self):
+ from shapely.wkt import loads
+ from nexustiles.nexustiles import NexusTileService
+
+ polygon = loads("POLYGON((-34.98 29.54, -30.1 29.54, -30.1 31.00, -34.98 31.00, -34.98 29.54))")
+ primary_ds = "ASCATB-L2-Coastal"
+ matchup_ds = "spurs"
+ parameter = "wind"
+ start_time = 1351468800 # 2012-10-29T00:00:00Z
+ end_time = 1351555200 # 2012-10-30T00:00:00Z
+ time_tolerance = 86400
+ depth_tolerance = 5.0
+ radius_tolerance = 110000.0 # 110 km
+ platforms = "1,2,3,4,5,6,7,8,9"
+
+ tile_service = NexusTileService()
+ tile_ids = [tile.tile_id for tile in
+ tile_service.find_tiles_in_polygon(polygon, primary_ds, start_time, end_time, fetch_data=False,
+ fl='id')]
+ result = spark_matchup_driver(tile_ids, wkt.dumps(polygon), primary_ds, matchup_ds, parameter, time_tolerance,
+ depth_tolerance, radius_tolerance, platforms)
+ for k, v in result.iteritems():
+ print "primary: %s\n\tmatches:\n\t\t%s" % (
+ "lon: %s, lat: %s, time: %s, wind u,v: %s,%s" % (k.longitude, k.latitude, k.time, k.wind_u, k.wind_v),
+ '\n\t\t'.join(
+ ["lon: %s, lat: %s, time: %s, wind u,v: %s,%s" % (
+ i.longitude, i.latitude, i.time, i.wind_u, i.wind_v) for i in v]))
diff --git a/analysis/tests/algorithms_spark/__init__.py b/analysis/tests/algorithms_spark/__init__.py
new file mode 100644
index 0000000..0707368
--- /dev/null
+++ b/analysis/tests/algorithms_spark/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
diff --git a/analysis/webservice/algorithms_spark/Matchup.py b/analysis/webservice/algorithms_spark/Matchup.py
new file mode 100644
index 0000000..9ae7557
--- /dev/null
+++ b/analysis/webservice/algorithms_spark/Matchup.py
@@ -0,0 +1,703 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+
+import json
+import logging
+import threading
+from datetime import datetime
+from itertools import chain
+from math import cos, radians
+
+import numpy as np
+import pyproj
+import requests
+from nexustiles.nexustiles import NexusTileService
+from pytz import timezone, UTC
+from scipy import spatial
+from shapely import wkt
+from shapely.geometry import Point
+from shapely.geometry import box
+from shapely.geos import WKTReadingError
+
+from webservice.NexusHandler import nexus_handler
+from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler
+from webservice.algorithms.doms import config as edge_endpoints
+from webservice.algorithms.doms import values as doms_values
+from webservice.algorithms.doms.BaseDomsHandler import DomsQueryResults
+from webservice.algorithms.doms.ResultsStorage import ResultsStorage
+from webservice.webmodel import NexusProcessingException
+
+EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
+ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
+
+
+def iso_time_to_epoch(str_time):
+ return (datetime.strptime(str_time, "%Y-%m-%dT%H:%M:%SZ").replace(
+ tzinfo=UTC) - EPOCH).total_seconds()
+
+
+@nexus_handler
+class Matchup(NexusCalcSparkHandler):
+ name = "Matchup"
+ path = "/match_spark"
+ description = "Match measurements between two or more datasets"
+
+ params = {
+ "primary": {
+ "name": "Primary Dataset",
+ "type": "string",
+ "description": "The Primary dataset used to find matches for. Required"
+ },
+ "matchup": {
+ "name": "Match-Up Datasets",
+ "type": "comma-delimited string",
+ "description": "The Dataset(s) being searched for measurements that match the Primary. Required"
+ },
+ "parameter": {
+ "name": "Match-Up Parameter",
+ "type": "string",
+ "description": "The parameter of interest used for the match up. One of 'sst', 'sss', 'wind'. Required"
+ },
+ "startTime": {
+ "name": "Start Time",
+ "type": "string",
+ "description": "Starting time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH. Required"
+ },
+ "endTime": {
+ "name": "End Time",
+ "type": "string",
+ "description": "Ending time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH. Required"
+ },
+ "b": {
+ "name": "Bounding box",
+ "type": "comma-delimited float",
+ "description": "Minimum (Western) Longitude, Minimum (Southern) Latitude, "
+ "Maximum (Eastern) Longitude, Maximum (Northern) Latitude. Required"
+ },
+ "depthMin": {
+ "name": "Minimum Depth",
+ "type": "float",
+ "description": "Minimum depth of measurements. Must be less than depthMax. Optional. Default: no limit"
+ },
+ "depthMax": {
+ "name": "Maximum Depth",
+ "type": "float",
+ "description": "Maximum depth of measurements. Must be greater than depthMin. Optional. Default: no limit"
+ },
+ "tt": {
+ "name": "Time Tolerance",
+ "type": "long",
+ "description": "Tolerance in time (seconds) when comparing two measurements. Optional. Default: 86400"
+ },
+ "rt": {
+ "name": "Radius Tolerance",
+ "type": "float",
+ "description": "Tolerance in radius (meters) when comparing two measurements. Optional. Default: 1000"
+ },
+ "platforms": {
+ "name": "Platforms",
+ "type": "comma-delimited integer",
+ "description": "Platforms to include for matchup consideration. Required"
+ },
+ "matchOnce": {
+ "name": "Match Once",
+ "type": "boolean",
+ "description": "Optional True/False flag used to determine if more than one match per primary point is returned. "
+ + "If true, only the nearest point will be returned for each primary point. "
+ + "If false, all points within the tolerances will be returned for each primary point. Default: False"
+ },
+ "resultSizeLimit": {
+ "name": "Result Size Limit",
+ "type": "int",
+ "description": "Optional integer value that limits the number of results returned from the matchup. "
+ "If the number of primary matches is greater than this limit, the service will respond with "
+ "(HTTP 202: Accepted) and an empty response body. A value of 0 means return all results. "
+ "Default: 500"
+ }
+ }
+ singleton = True
+
+ def __init__(self, algorithm_config=None, sc=None):
+ NexusCalcSparkHandler.__init__(self, algorithm_config=algorithm_config, sc=sc, skipCassandra=True)
+ self.log = logging.getLogger(__name__)
+
+ def parse_arguments(self, request):
+ # Parse input arguments
+ self.log.debug("Parsing arguments")
+ try:
+ bounding_polygon = request.get_bounding_polygon()
+ except:
+ raise NexusProcessingException(
+ reason="'b' argument is required. Must be comma-delimited float formatted as Minimum (Western) Longitude, Minimum (Southern) Latitude, Maximum (Eastern) Longitude, Maximum (Northern) Latitude",
+ code=400)
+ primary_ds_name = request.get_argument('primary', None)
+ if primary_ds_name is None:
+ raise NexusProcessingException(reason="'primary' argument is required", code=400)
+ matchup_ds_names = request.get_argument('matchup', None)
+ if matchup_ds_names is None:
+ raise NexusProcessingException(reason="'matchup' argument is required", code=400)
+
+ parameter_s = request.get_argument('parameter', 'sst')
+ if parameter_s not in ['sst', 'sss', 'wind']:
+ raise NexusProcessingException(
+ reason="Parameter %s not supported. Must be one of 'sst', 'sss', 'wind'." % parameter_s, code=400)
+
+ try:
+ start_time = request.get_start_datetime()
+ except:
+ raise NexusProcessingException(
+ reason="'startTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ",
+ code=400)
+ try:
+ end_time = request.get_end_datetime()
+ except:
+ raise NexusProcessingException(
+ reason="'endTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ",
+ code=400)
+
+ if start_time > end_time:
+ raise NexusProcessingException(
+ reason="The starting time must be before the ending time. Received startTime: %s, endTime: %s" % (
+ request.get_start_datetime().strftime(ISO_8601), request.get_end_datetime().strftime(ISO_8601)),
+ code=400)
+
+ depth_min = request.get_decimal_arg('depthMin', default=None)
+ depth_max = request.get_decimal_arg('depthMax', default=None)
+
+ if depth_min is not None and depth_max is not None and depth_min >= depth_max:
+ raise NexusProcessingException(
+ reason="Depth Min should be less than Depth Max", code=400)
+
+ time_tolerance = request.get_int_arg('tt', default=86400)
+ radius_tolerance = request.get_decimal_arg('rt', default=1000.0)
+ platforms = request.get_argument('platforms', None)
+ if platforms is None:
+ raise NexusProcessingException(reason="'platforms' argument is required", code=400)
+ try:
+ p_validation = platforms.split(',')
+ p_validation = [int(p) for p in p_validation]
+ del p_validation
+ except:
+ raise NexusProcessingException(reason="platforms must be a comma-delimited list of integers", code=400)
+
+ match_once = request.get_boolean_arg("matchOnce", default=False)
+
+ result_size_limit = request.get_int_arg("resultSizeLimit", default=500)
+
+ start_seconds_from_epoch = long((start_time - EPOCH).total_seconds())
+ end_seconds_from_epoch = long((end_time - EPOCH).total_seconds())
+
+ return bounding_polygon, primary_ds_name, matchup_ds_names, parameter_s, \
+ start_time, start_seconds_from_epoch, end_time, end_seconds_from_epoch, \
+ depth_min, depth_max, time_tolerance, radius_tolerance, \
+ platforms, match_once, result_size_limit
+
+ def calc(self, request, **args):
+ start = datetime.utcnow()
+ # TODO Assuming Satellite primary
+ bounding_polygon, primary_ds_name, matchup_ds_names, parameter_s, \
+ start_time, start_seconds_from_epoch, end_time, end_seconds_from_epoch, \
+ depth_min, depth_max, time_tolerance, radius_tolerance, \
+ platforms, match_once, result_size_limit = self.parse_arguments(request)
+
+ with ResultsStorage() as resultsStorage:
+
+ execution_id = str(resultsStorage.insertExecution(None, start, None, None))
+
+ self.log.debug("Querying for tiles in search domain")
+ # Get tile ids in box
+ tile_ids = [tile.tile_id for tile in
+ self._get_tile_service().find_tiles_in_polygon(bounding_polygon, primary_ds_name,
+ start_seconds_from_epoch, end_seconds_from_epoch,
+ fetch_data=False, fl='id',
+ sort=['tile_min_time_dt asc', 'tile_min_lon asc',
+ 'tile_min_lat asc'], rows=5000)]
+
+ # Call spark_matchup
+ self.log.debug("Calling Spark Driver")
+ try:
+ spark_result = spark_matchup_driver(tile_ids, wkt.dumps(bounding_polygon), primary_ds_name,
+ matchup_ds_names, parameter_s, depth_min, depth_max, time_tolerance,
+ radius_tolerance, platforms, match_once, sc=self._sc)
+ except Exception as e:
+ self.log.exception(e)
+ raise NexusProcessingException(reason="An unknown error occurred while computing matches", code=500)
+
+ end = datetime.utcnow()
+
+ self.log.debug("Building and saving results")
+ args = {
+ "primary": primary_ds_name,
+ "matchup": matchup_ds_names,
+ "startTime": start_time,
+ "endTime": end_time,
+ "bbox": request.get_argument('b'),
+ "timeTolerance": time_tolerance,
+ "radiusTolerance": float(radius_tolerance),
+ "platforms": platforms,
+ "parameter": parameter_s
+ }
+
+ if depth_min is not None:
+ args["depthMin"] = float(depth_min)
+
+ if depth_max is not None:
+ args["depthMax"] = float(depth_max)
+
+ total_keys = len(spark_result.keys())
+ total_values = sum(len(v) for v in spark_result.itervalues())
+ details = {
+ "timeToComplete": int((end - start).total_seconds()),
+ "numInSituRecords": 0,
+ "numInSituMatched": total_values,
+ "numGriddedChecked": 0,
+ "numGriddedMatched": total_keys
+ }
+
+ matches = Matchup.convert_to_matches(spark_result)
+
+ def do_result_insert():
+ with ResultsStorage() as storage:
+ storage.insertResults(results=matches, params=args, stats=details,
+ startTime=start, completeTime=end, userEmail="",
+ execution_id=execution_id)
+
+ threading.Thread(target=do_result_insert).start()
+
+ if 0 < result_size_limit < len(matches):
+ result = DomsQueryResults(results=None, args=args, details=details, bounds=None, count=None,
+ computeOptions=None, executionId=execution_id, status_code=202)
+ else:
+ result = DomsQueryResults(results=matches, args=args, details=details, bounds=None, count=None,
+ computeOptions=None, executionId=execution_id)
+
+ return result
+
+ @classmethod
+ def convert_to_matches(cls, spark_result):
+ matches = []
+ for primary_domspoint, matched_domspoints in spark_result.iteritems():
+ p_matched = [cls.domspoint_to_dict(p_match) for p_match in matched_domspoints]
+
+ primary = cls.domspoint_to_dict(primary_domspoint)
+ primary['matches'] = list(p_matched)
+ matches.append(primary)
+ return matches
+
+ @staticmethod
+ def domspoint_to_dict(domspoint):
+ return {
+ "sea_water_temperature": domspoint.sst,
+ "sea_water_temperature_depth": domspoint.sst_depth,
+ "sea_water_salinity": domspoint.sss,
+ "sea_water_salinity_depth": domspoint.sss_depth,
+ "wind_speed": domspoint.wind_speed,
+ "wind_direction": domspoint.wind_direction,
+ "wind_u": domspoint.wind_u,
+ "wind_v": domspoint.wind_v,
+ "platform": doms_values.getPlatformById(domspoint.platform),
+ "device": doms_values.getDeviceById(domspoint.device),
+ "x": str(domspoint.longitude),
+ "y": str(domspoint.latitude),
+ "point": "Point(%s %s)" % (domspoint.longitude, domspoint.latitude),
+ "time": datetime.strptime(domspoint.time, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=UTC),
+ "fileurl": domspoint.file_url,
+ "id": domspoint.data_id,
+ "source": domspoint.source,
+ }
+
+
+class DomsPoint(object):
+ def __init__(self, longitude=None, latitude=None, time=None, depth=None, data_id=None):
+
+ self.time = time
+ self.longitude = longitude
+ self.latitude = latitude
+ self.depth = depth
+ self.data_id = data_id
+
+ self.wind_u = None
+ self.wind_v = None
+ self.wind_direction = None
+ self.wind_speed = None
+ self.sst = None
+ self.sst_depth = None
+ self.sss = None
+ self.sss_depth = None
+ self.source = None
+ self.depth = None
+ self.platform = None
+ self.device = None
+ self.file_url = None
+
+ def __repr__(self):
+ return str(self.__dict__)
+
+ @staticmethod
+ def from_nexus_point(nexus_point, tile=None, parameter='sst'):
+ point = DomsPoint()
+
+ point.data_id = "%s[%s]" % (tile.tile_id, nexus_point.index)
+
+ # TODO Not an ideal solution; but it works for now.
+ if parameter == 'sst':
+ point.sst = nexus_point.data_val.item()
+ elif parameter == 'sss':
+ point.sss = nexus_point.data_val.item()
+ elif parameter == 'wind':
+ point.wind_u = nexus_point.data_val.item()
+ try:
+ point.wind_v = tile.meta_data['wind_v'][tuple(nexus_point.index)].item()
+ except (KeyError, IndexError):
+ pass
+ try:
+ point.wind_direction = tile.meta_data['wind_dir'][tuple(nexus_point.index)].item()
+ except (KeyError, IndexError):
+ pass
+ try:
+ point.wind_speed = tile.meta_data['wind_speed'][tuple(nexus_point.index)].item()
+ except (KeyError, IndexError):
+ pass
+ else:
+ raise NotImplementedError('%s not supported. Only sst, sss, and wind parameters are supported.' % parameter)
+
+ point.longitude = nexus_point.longitude.item()
+ point.latitude = nexus_point.latitude.item()
+
+ point.time = datetime.utcfromtimestamp(nexus_point.time).strftime('%Y-%m-%dT%H:%M:%SZ')
+
+ try:
+ point.depth = nexus_point.depth
+ except KeyError:
+ # No depth associated with this measurement
+ pass
+
+ point.sst_depth = 0
+ point.source = tile.dataset
+ point.file_url = tile.granule
+
+ # TODO device should change based on the satellite making the observations.
+ point.platform = 9
+ point.device = 5
+ return point
+
+ @staticmethod
+ def from_edge_point(edge_point):
+ point = DomsPoint()
+
+ try:
+ x, y = wkt.loads(edge_point['point']).coords[0]
+ except WKTReadingError:
+ try:
+ x, y = Point(*[float(c) for c in edge_point['point'].split(' ')]).coords[0]
+ except ValueError:
+ y, x = Point(*[float(c) for c in edge_point['point'].split(',')]).coords[0]
+
+ point.longitude = x
+ point.latitude = y
+
+ point.time = edge_point['time']
+
+ point.wind_u = edge_point.get('eastward_wind')
+ point.wind_v = edge_point.get('northward_wind')
+ point.wind_direction = edge_point.get('wind_direction')
+ point.wind_speed = edge_point.get('wind_speed')
+ point.sst = edge_point.get('sea_water_temperature')
+ point.sst_depth = edge_point.get('sea_water_temperature_depth')
+ point.sss = edge_point.get('sea_water_salinity')
+ point.sss_depth = edge_point.get('sea_water_salinity_depth')
+ point.source = edge_point.get('source')
+ point.platform = edge_point.get('platform')
+ point.device = edge_point.get('device')
+ point.file_url = edge_point.get('fileurl')
+
+ try:
+ point.data_id = unicode(edge_point['id'])
+ except KeyError:
+ point.data_id = "%s:%s:%s" % (point.time, point.longitude, point.latitude)
+
+ return point
+
+
+from threading import Lock
+
+DRIVER_LOCK = Lock()
+
+
+def spark_matchup_driver(tile_ids, bounding_wkt, primary_ds_name, matchup_ds_names, parameter, depth_min, depth_max,
+ time_tolerance, radius_tolerance, platforms, match_once, sc=None):
+ from functools import partial
+
+ with DRIVER_LOCK:
+ # Broadcast parameters
+ primary_b = sc.broadcast(primary_ds_name)
+ matchup_b = sc.broadcast(matchup_ds_names)
+ depth_min_b = sc.broadcast(float(depth_min) if depth_min is not None else None)
+ depth_max_b = sc.broadcast(float(depth_max) if depth_max is not None else None)
+ tt_b = sc.broadcast(time_tolerance)
+ rt_b = sc.broadcast(float(radius_tolerance))
+ platforms_b = sc.broadcast(platforms)
+ bounding_wkt_b = sc.broadcast(bounding_wkt)
+ parameter_b = sc.broadcast(parameter)
+
+ # Parallelize list of tile ids
+ rdd = sc.parallelize(tile_ids, determine_parllelism(len(tile_ids)))
+
+ # Map Partitions ( list(tile_id) )
+ rdd_filtered = rdd.mapPartitions(
+ partial(match_satellite_to_insitu, primary_b=primary_b, matchup_b=matchup_b, parameter_b=parameter_b, tt_b=tt_b,
+ rt_b=rt_b, platforms_b=platforms_b, bounding_wkt_b=bounding_wkt_b, depth_min_b=depth_min_b,
+ depth_max_b=depth_max_b), preservesPartitioning=True) \
+ .filter(lambda p_m_tuple: abs(
+ iso_time_to_epoch(p_m_tuple[0].time) - iso_time_to_epoch(p_m_tuple[1].time)) <= time_tolerance)
+
+ if match_once:
+ # Only the 'nearest' point for each primary should be returned. Add an extra map/reduce which calculates
+ # the distance and finds the minimum
+
+ # Method used for calculating the distance between 2 DomsPoints
+ from pyproj import Geod
+
+ def dist(primary, matchup):
+ wgs84_geod = Geod(ellps='WGS84')
+ lat1, lon1 = (primary.latitude, primary.longitude)
+ lat2, lon2 = (matchup.latitude, matchup.longitude)
+ az12, az21, distance = wgs84_geod.inv(lon1, lat1, lon2, lat2)
+ return distance
+
+ rdd_filtered = rdd_filtered \
+ .map(lambda (primary, matchup): tuple([primary, tuple([matchup, dist(primary, matchup)])])) \
+ .reduceByKey(lambda match_1, match_2: match_1 if match_1[1] < match_2[1] else match_2) \
+ .mapValues(lambda x: [x[0]])
+ else:
+ rdd_filtered = rdd_filtered \
+ .combineByKey(lambda value: [value], # Create 1 element list
+ lambda value_list, value: value_list + [value], # Add 1 element to list
+ lambda value_list_a, value_list_b: value_list_a + value_list_b) # Add two lists together
+
+ result_as_map = rdd_filtered.collectAsMap()
+
+ return result_as_map
+
+
+def determine_parllelism(num_tiles):
+ """
+ Try to stay at a maximum of 140 tiles per partition; But don't go over 128 partitions.
+ Also, don't go below the default of 8
+ """
+ num_partitions = max(min(num_tiles / 140, 128), 8)
+ return num_partitions
+
+
+def add_meters_to_lon_lat(lon, lat, meters):
+ """
+ Uses a simple approximation of
+ 1 degree latitude = 111,111 meters
+ 1 degree longitude = 111,111 meters * cosine(latitude)
+ :param lon: longitude to add meters to
+ :param lat: latitude to add meters to
+ :param meters: meters to add to the longitude and latitude values
+ :return: (longitude, latitude) increased by given meters
+ """
+ longitude = lon + ((meters / 111111) * cos(radians(lat)))
+ latitude = lat + (meters / 111111)
+
+ return longitude, latitude
+
+
+def match_satellite_to_insitu(tile_ids, primary_b, matchup_b, parameter_b, tt_b, rt_b, platforms_b,
+ bounding_wkt_b, depth_min_b, depth_max_b):
+ the_time = datetime.now()
+ tile_ids = list(tile_ids)
+ if len(tile_ids) == 0:
+ return []
+ tile_service = NexusTileService()
+
+ # Determine the spatial temporal extents of this partition of tiles
+ tiles_bbox = tile_service.get_bounding_box(tile_ids)
+ tiles_min_time = tile_service.get_min_time(tile_ids)
+ tiles_max_time = tile_service.get_max_time(tile_ids)
+
+ # Increase spatial extents by the radius tolerance
+ matchup_min_lon, matchup_min_lat = add_meters_to_lon_lat(tiles_bbox.bounds[0], tiles_bbox.bounds[1],
+ -1 * rt_b.value)
+ matchup_max_lon, matchup_max_lat = add_meters_to_lon_lat(tiles_bbox.bounds[2], tiles_bbox.bounds[3], rt_b.value)
+
+ # Don't go outside of the search domain
+ search_min_x, search_min_y, search_max_x, search_max_y = wkt.loads(bounding_wkt_b.value).bounds
+ matchup_min_lon = max(matchup_min_lon, search_min_x)
+ matchup_min_lat = max(matchup_min_lat, search_min_y)
+ matchup_max_lon = min(matchup_max_lon, search_max_x)
+ matchup_max_lat = min(matchup_max_lat, search_max_y)
+
+ # Find the centroid of the matchup bounding box and initialize the projections
+ matchup_center = box(matchup_min_lon, matchup_min_lat, matchup_max_lon, matchup_max_lat).centroid.coords[0]
+ aeqd_proj = pyproj.Proj(proj='aeqd', lon_0=matchup_center[0], lat_0=matchup_center[1])
+ lonlat_proj = pyproj.Proj(proj='lonlat')
+
+ # Increase temporal extents by the time tolerance
+ matchup_min_time = tiles_min_time - tt_b.value
+ matchup_max_time = tiles_max_time + tt_b.value
+ print "%s Time to determine spatial-temporal extents for partition %s to %s" % (
+ str(datetime.now() - the_time), tile_ids[0], tile_ids[-1])
+
+ # Query edge for all points within the spatial-temporal extents of this partition
+ the_time = datetime.now()
+ edge_session = requests.Session()
+ edge_results = []
+ with edge_session:
+ for insitudata_name in matchup_b.value.split(','):
+ bbox = ','.join(
+ [str(matchup_min_lon), str(matchup_min_lat), str(matchup_max_lon), str(matchup_max_lat)])
+ edge_response = query_edge(insitudata_name, parameter_b.value, matchup_min_time, matchup_max_time, bbox,
+ platforms_b.value, depth_min_b.value, depth_max_b.value, session=edge_session)
+ if edge_response['totalResults'] == 0:
+ continue
+ r = edge_response['results']
+ for p in r:
+ p['source'] = insitudata_name
+ edge_results.extend(r)
+ print "%s Time to call edge for partition %s to %s" % (str(datetime.now() - the_time), tile_ids[0], tile_ids[-1])
+ if len(edge_results) == 0:
+ return []
+
+ # Convert edge points to utm
+ the_time = datetime.now()
+ matchup_points = np.ndarray((len(edge_results), 2), dtype=np.float32)
+ for n, edge_point in enumerate(edge_results):
+ try:
+ x, y = wkt.loads(edge_point['point']).coords[0]
+ except WKTReadingError:
+ try:
+ x, y = Point(*[float(c) for c in edge_point['point'].split(' ')]).coords[0]
+ except ValueError:
+ y, x = Point(*[float(c) for c in edge_point['point'].split(',')]).coords[0]
+
+ matchup_points[n][0], matchup_points[n][1] = pyproj.transform(p1=lonlat_proj, p2=aeqd_proj, x=x, y=y)
+ print "%s Time to convert match points for partition %s to %s" % (
+ str(datetime.now() - the_time), tile_ids[0], tile_ids[-1])
+
+ # Build kdtree from matchup points
+ the_time = datetime.now()
+ m_tree = spatial.cKDTree(matchup_points, leafsize=30)
+ print "%s Time to build matchup tree" % (str(datetime.now() - the_time))
+
+ # The actual matching happens in the generator. This is so that we only load 1 tile into memory at a time
+ match_generators = [match_tile_to_point_generator(tile_service, tile_id, m_tree, edge_results, bounding_wkt_b.value,
+ parameter_b.value, rt_b.value, lonlat_proj, aeqd_proj) for tile_id
+ in tile_ids]
+
+ return chain(*match_generators)
+
+
+def match_tile_to_point_generator(tile_service, tile_id, m_tree, edge_results, search_domain_bounding_wkt,
+ search_parameter, radius_tolerance, lonlat_proj, aeqd_proj):
+ from nexustiles.model.nexusmodel import NexusPoint
+ from webservice.algorithms_spark.Matchup import DomsPoint # Must import DomsPoint or Spark complains
+
+ # Load tile
+ try:
+ the_time = datetime.now()
+ tile = tile_service.mask_tiles_to_polygon(wkt.loads(search_domain_bounding_wkt),
+ tile_service.find_tile_by_id(tile_id))[0]
+ print "%s Time to load tile %s" % (str(datetime.now() - the_time), tile_id)
+ except IndexError:
+ # This should only happen if all measurements in a tile become masked after applying the bounding polygon
+ raise StopIteration
+
+ # Convert valid tile lat,lon tuples to UTM tuples
+ the_time = datetime.now()
+ # Get list of indices of valid values
+ valid_indices = tile.get_indices()
+ primary_points = np.array(
+ [pyproj.transform(p1=lonlat_proj, p2=aeqd_proj, x=tile.longitudes[aslice[2]], y=tile.latitudes[aslice[1]]) for
+ aslice in valid_indices])
+
+ print "%s Time to convert primary points for tile %s" % (str(datetime.now() - the_time), tile_id)
+
+ a_time = datetime.now()
+ p_tree = spatial.cKDTree(primary_points, leafsize=30)
+ print "%s Time to build primary tree" % (str(datetime.now() - a_time))
+
+ a_time = datetime.now()
+ matched_indexes = p_tree.query_ball_tree(m_tree, radius_tolerance)
+ print "%s Time to query primary tree for tile %s" % (str(datetime.now() - a_time), tile_id)
+ for i, point_matches in enumerate(matched_indexes):
+ if len(point_matches) > 0:
+ p_nexus_point = NexusPoint(tile.latitudes[valid_indices[i][1]],
+ tile.longitudes[valid_indices[i][2]], None,
+ tile.times[valid_indices[i][0]], valid_indices[i],
+ tile.data[tuple(valid_indices[i])])
+ p_doms_point = DomsPoint.from_nexus_point(p_nexus_point, tile=tile, parameter=search_parameter)
+ for m_point_index in point_matches:
+ m_doms_point = DomsPoint.from_edge_point(edge_results[m_point_index])
+ yield p_doms_point, m_doms_point
+
+
+def query_edge(dataset, variable, startTime, endTime, bbox, platform, depth_min, depth_max, itemsPerPage=1000,
+ startIndex=0, stats=True, session=None):
+ try:
+ startTime = datetime.utcfromtimestamp(startTime).strftime('%Y-%m-%dT%H:%M:%SZ')
+ except TypeError:
+ # Assume we were passed a properly formatted string
+ pass
+
+ try:
+ endTime = datetime.utcfromtimestamp(endTime).strftime('%Y-%m-%dT%H:%M:%SZ')
+ except TypeError:
+ # Assume we were passed a properly formatted string
+ pass
+
+ try:
+ platform = platform.split(',')
+ except AttributeError:
+ # Assume we were passed a list
+ pass
+
+ params = {"variable": variable,
+ "startTime": startTime,
+ "endTime": endTime,
+ "bbox": bbox,
+ "minDepth": depth_min,
+ "maxDepth": depth_max,
+ "platform": platform,
+ "itemsPerPage": itemsPerPage, "startIndex": startIndex, "stats": str(stats).lower()}
+
+ if session is not None:
+ edge_request = session.get(edge_endpoints.getEndpointByName(dataset)['url'], params=params)
+ else:
+ edge_request = requests.get(edge_endpoints.getEndpointByName(dataset)['url'], params=params)
+
+ edge_request.raise_for_status()
+ edge_response = json.loads(edge_request.text)
+
+ # Get all edge results
+ next_page_url = edge_response.get('next', None)
+ while next_page_url is not None:
+ if session is not None:
+ edge_page_request = session.get(next_page_url)
+ else:
+ edge_page_request = requests.get(next_page_url)
+
+ edge_page_request.raise_for_status()
+ edge_page_response = json.loads(edge_page_request.text)
+
+ edge_response['results'].extend(edge_page_response['results'])
+
+ next_page_url = edge_page_response.get('next', None)
+
+ return edge_response