You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by tl...@apache.org on 2020/08/11 01:59:34 UTC
[incubator-sdap-nexus] 11/28: read cli args for cass and solr hosts
This is an automated email from the ASF dual-hosted git repository.
tloubrieu pushed a commit to branch SDAP-268
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git
commit 4e3c24ac39704e37515bdaf5927e0ef598593e2a
Author: Eamon Ford <ea...@jpl.nasa.gov>
AuthorDate: Wed Jul 22 16:10:53 2020 -0700
read cli args for cass and solr hosts
---
.gitignore | 1 -
analysis/setup.py | 3 +-
analysis/tests/algorithms_spark/Matchup_test.py | 321 ----------
analysis/tests/algorithms_spark/__init__.py | 16 -
analysis/webservice/algorithms/Capabilities.py | 3 -
analysis/webservice/algorithms/CorrelationMap.py | 3 -
.../algorithms/DailyDifferenceAverage.py | 3 -
.../webservice/algorithms/DataInBoundsSearch.py | 6 -
analysis/webservice/algorithms/DataSeriesList.py | 7 +-
analysis/webservice/algorithms/DelayTest.py | 3 -
analysis/webservice/algorithms/ErrorTosserTest.py | 3 -
analysis/webservice/algorithms/Heartbeat.py | 3 -
analysis/webservice/algorithms/HofMoeller.py | 10 +-
.../webservice/algorithms/LongitudeLatitudeMap.py | 6 -
.../algorithms/StandardDeviationSearch.py | 5 -
analysis/webservice/algorithms/TileSearch.py | 3 -
analysis/webservice/algorithms/TimeAvgMap.py | 3 -
analysis/webservice/algorithms/TimeSeries.py | 22 +-
analysis/webservice/algorithms/TimeSeriesSolr.py | 7 +-
.../webservice/algorithms/doms/BaseDomsHandler.py | 635 -------------------
.../webservice/algorithms/doms/DatasetListQuery.py | 116 ----
.../algorithms/doms/DomsInitialization.py | 164 -----
.../webservice/algorithms/doms/MatchupQuery.py | 452 -------------
.../webservice/algorithms/doms/MetadataQuery.py | 65 --
.../webservice/algorithms/doms/ResultsPlotQuery.py | 55 --
.../webservice/algorithms/doms/ResultsRetrieval.py | 49 --
.../webservice/algorithms/doms/ResultsStorage.py | 286 ---------
analysis/webservice/algorithms/doms/StatsQuery.py | 63 --
analysis/webservice/algorithms/doms/ValuesQuery.py | 72 ---
analysis/webservice/algorithms/doms/__init__.py | 34 -
analysis/webservice/algorithms/doms/config.py | 109 ----
analysis/webservice/algorithms/doms/datafetch.py | 47 --
.../algorithms/doms/domsconfig.ini.default | 15 -
.../webservice/algorithms/doms/fetchedgeimpl.py | 217 -------
analysis/webservice/algorithms/doms/geo.py | 129 ----
.../webservice/algorithms/doms/histogramplot.py | 127 ----
.../webservice/algorithms/doms/insitusubset.py | 263 --------
analysis/webservice/algorithms/doms/mapplot.py | 175 -----
analysis/webservice/algorithms/doms/scatterplot.py | 118 ----
analysis/webservice/algorithms/doms/subsetter.py | 260 --------
analysis/webservice/algorithms/doms/values.py | 72 ---
.../webservice/algorithms/doms/workerthread.py | 61 --
analysis/webservice/algorithms_spark/Matchup.py | 703 ---------------------
.../algorithms_spark/NexusCalcSparkHandler.py | 1 +
analysis/webservice/algorithms_spark/__init__.py | 6 -
analysis/webservice/config/web.ini | 2 +-
analysis/webservice/webapp.py | 3 +-
.../nexustiles/config/datastores.ini.default | 4 +-
data-access/nexustiles/nexustiles.py | 6 +-
helm/templates/webapp.yml | 2 +-
tools/doms/README.md | 66 --
tools/doms/doms_reader.py | 144 -----
52 files changed, 31 insertions(+), 4918 deletions(-)
diff --git a/.gitignore b/.gitignore
index 3e29626..4e4cf6e 100644
--- a/.gitignore
+++ b/.gitignore
@@ -2,6 +2,5 @@
*.code-workspace
*.idea
*.DS_Store
-analysis/webservice/algorithms/doms/domsconfig.ini
data-access/nexustiles/config/datastores.ini
venv/
diff --git a/analysis/setup.py b/analysis/setup.py
index 62a6891..9a449ce 100644
--- a/analysis/setup.py
+++ b/analysis/setup.py
@@ -50,8 +50,7 @@ setuptools.setup(
# 'webservice.nexus_tornado.request.renderers'
#],
package_data={
- 'webservice': ['config/web.ini', 'config/algorithms.ini'],
- 'webservice.algorithms.doms': ['domsconfig.ini.default']
+ 'webservice': ['config/web.ini', 'config/algorithms.ini']
},
data_files=[
('static', ['static/index.html'])
diff --git a/analysis/tests/algorithms_spark/Matchup_test.py b/analysis/tests/algorithms_spark/Matchup_test.py
deleted file mode 100644
index 5dee17c..0000000
--- a/analysis/tests/algorithms_spark/Matchup_test.py
+++ /dev/null
@@ -1,321 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-import pickle
-import random
-import timeit
-import unittest
-
-from webservice.algorithms_spark.Matchup import *
-
-
-class TestMatch_Points(unittest.TestCase):
- def test_one_point_match_exact(self):
- primary = DomsPoint(longitude=1.0, latitude=2.0, time=1000, depth=5.0, data_id=1)
- matchup = DomsPoint(longitude=1.0, latitude=2.0, time=1000, depth=5.0, data_id=2)
-
- primary_points = [primary]
- matchup_points = [matchup]
-
- matches = list(match_points_generator(primary_points, matchup_points, 0))
-
- self.assertEquals(1, len(matches))
-
- p_match_point, match = matches[0]
-
- self.assertEqual(primary, p_match_point)
- self.assertEqual(matchup, match)
-
- def test_one_point_match_within_tolerance_150km(self):
- primary = DomsPoint(longitude=1.0, latitude=2.0, time=1000, depth=5.0, data_id=1)
- matchup = DomsPoint(longitude=1.0, latitude=3.0, time=1000, depth=5.0, data_id=2)
-
- primary_points = [primary]
- matchup_points = [matchup]
-
- matches = list(match_points_generator(primary_points, matchup_points, 150000)) # tolerance 150 km
-
- self.assertEquals(1, len(matches))
-
- p_match_point, match = matches[0]
-
- self.assertEqual(primary, p_match_point)
- self.assertEqual(matchup, match)
-
- def test_one_point_match_within_tolerance_200m(self):
- primary = DomsPoint(longitude=1.0, latitude=2.0, time=1000, depth=5.0, data_id=1)
- matchup = DomsPoint(longitude=1.001, latitude=2.0, time=1000, depth=5.0, data_id=2)
-
- primary_points = [primary]
- matchup_points = [matchup]
-
- matches = list(match_points_generator(primary_points, matchup_points, 200)) # tolerance 200 m
-
- self.assertEquals(1, len(matches))
-
- p_match_point, match = matches[0]
-
- self.assertEqual(primary, p_match_point)
- self.assertEqual(matchup, match)
-
- def test_one_point_not_match_tolerance_150km(self):
- primary = DomsPoint(longitude=1.0, latitude=2.0, time=1000, depth=5.0, data_id=1)
- matchup = DomsPoint(longitude=1.0, latitude=4.0, time=1000, depth=5.0, data_id=2)
-
- primary_points = [primary]
- matchup_points = [matchup]
-
- matches = list(match_points_generator(primary_points, matchup_points, 150000)) # tolerance 150 km
-
- self.assertEquals(0, len(matches))
-
- def test_one_point_not_match_tolerance_100m(self):
- primary = DomsPoint(longitude=1.0, latitude=2.0, time=1000, depth=5.0, data_id=1)
- matchup = DomsPoint(longitude=1.001, latitude=2.0, time=1000, depth=5.0, data_id=2)
-
- primary_points = [primary]
- matchup_points = [matchup]
-
- matches = list(match_points_generator(primary_points, matchup_points, 100)) # tolerance 100 m
-
- self.assertEquals(0, len(matches))
-
- def test_multiple_point_match(self):
- primary = DomsPoint(longitude=1.0, latitude=2.0, time=1000, depth=5.0, data_id=1)
- primary_points = [primary]
-
- matchup_points = [
- DomsPoint(longitude=1.0, latitude=3.0, time=1000, depth=10.0, data_id=2),
- DomsPoint(longitude=2.0, latitude=2.0, time=1000, depth=0.0, data_id=3),
- DomsPoint(longitude=0.5, latitude=1.5, time=1000, depth=3.0, data_id=4)
- ]
-
- matches = list(match_points_generator(primary_points, matchup_points, 150000)) # tolerance 150 km
-
- self.assertEquals(3, len(matches))
-
- self.assertSetEqual({primary}, {x[0] for x in matches})
-
- list_of_matches = [x[1] for x in matches]
-
- self.assertEquals(3, len(list_of_matches))
- self.assertItemsEqual(matchup_points, list_of_matches)
-
- def test_multiple_point_match_multiple_times(self):
- primary_points = [
- DomsPoint(longitude=1.0, latitude=2.0, time=1000, depth=5.0, data_id=1),
- DomsPoint(longitude=1.5, latitude=1.5, time=1000, depth=5.0, data_id=2)
- ]
-
- matchup_points = [
- DomsPoint(longitude=1.0, latitude=3.0, time=1000, depth=10.0, data_id=3),
- DomsPoint(longitude=2.0, latitude=2.0, time=1000, depth=0.0, data_id=4),
- DomsPoint(longitude=0.5, latitude=1.5, time=1000, depth=3.0, data_id=5)
- ]
-
- matches = list(match_points_generator(primary_points, matchup_points, 150000)) # tolerance 150 km
-
- self.assertEquals(5, len(matches))
-
- self.assertSetEqual({p for p in primary_points}, {x[0] for x in matches})
-
- # First primary point matches all 3 secondary
- self.assertEquals(3, [x[0] for x in matches].count(primary_points[0]))
- self.assertItemsEqual(matchup_points, [x[1] for x in matches if x[0] == primary_points[0]])
-
- # Second primary point matches only last 2 secondary
- self.assertEquals(2, [x[0] for x in matches].count(primary_points[1]))
- self.assertItemsEqual(matchup_points[1:], [x[1] for x in matches if x[0] == primary_points[1]])
-
- def test_one_of_many_primary_matches_one_of_many_matchup(self):
- primary_points = [
- DomsPoint(longitude=-33.76764, latitude=30.42946, time=1351553994, data_id=1),
- DomsPoint(longitude=-33.75731, latitude=29.86216, time=1351554004, data_id=2)
- ]
-
- matchup_points = [
- DomsPoint(longitude=-33.762, latitude=28.877, time=1351521432, depth=3.973, data_id=3),
- DomsPoint(longitude=-34.916, latitude=28.879, time=1351521770, depth=2.9798, data_id=4),
- DomsPoint(longitude=-31.121, latitude=31.256, time=1351519892, depth=4.07, data_id=5)
- ]
-
- matches = list(match_points_generator(primary_points, matchup_points, 110000)) # tolerance 110 km
-
- self.assertEquals(1, len(matches))
-
- self.assertSetEqual({p for p in primary_points if p.data_id == 2}, {x[0] for x in matches})
-
- # First primary point matches none
- self.assertEquals(0, [x[0] for x in matches].count(primary_points[0]))
-
- # Second primary point matches only first secondary
- self.assertEquals(1, [x[0] for x in matches].count(primary_points[1]))
- self.assertItemsEqual(matchup_points[0:1], [x[1] for x in matches if x[0] == primary_points[1]])
-
- @unittest.skip("This test is just for timing, doesn't actually assert anything.")
- def test_time_many_primary_many_matchup(self):
- import logging
- import sys
- logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
- datefmt="%Y-%m-%dT%H:%M:%S", stream=sys.stdout)
- log = logging.getLogger(__name__)
- # Generate 160000 DomsPoints distributed equally in a box from -2.0 lat/lon to 2.0 lat/lon
- log.info("Generating primary points")
- x = np.arange(-2.0, 2.0, 0.01)
- y = np.arange(-2.0, 2.0, 0.01)
- primary_points = [DomsPoint(longitude=xy[0], latitude=xy[1], time=1000, depth=5.0, data_id=i) for i, xy in
- enumerate(np.array(np.meshgrid(x, y)).T.reshape(-1, 2))]
-
- # Generate 2000 DomsPoints distributed randomly in a box from -2.0 lat/lon to 2.0 lat/lon
- log.info("Generating matchup points")
- matchup_points = [
- DomsPoint(longitude=random.uniform(-2.0, 2.0), latitude=random.uniform(-2.0, 2.0), time=1000, depth=5.0,
- data_id=i) for i in xrange(0, 2000)]
-
- log.info("Starting matchup")
- log.info("Best of repeat(3, 2) matchups: %s seconds" % min(
- timeit.repeat(lambda: list(match_points_generator(primary_points, matchup_points, 1500)), repeat=3,
- number=2)))
-
-
-class TestDOMSPoint(unittest.TestCase):
- def test_is_pickleable(self):
- edge_point = json.loads("""{
-"id": "argo-profiles-5903995(46, 0)",
-"time": "2012-10-15T14:24:04Z",
-"point": "-33.467 29.728",
-"sea_water_temperature": 24.5629997253,
-"sea_water_temperature_depth": 2.9796258642,
-"wind_speed": null,
-"sea_water_salinity": null,
-"sea_water_salinity_depth": null,
-"platform": 4,
-"device": 3,
-"fileurl": "ftp://podaac-ftp.jpl.nasa.gov/allData/insitu/L2/spurs1/argo/argo-profiles-5903995.nc"
-}""")
- point = DomsPoint.from_edge_point(edge_point)
- self.assertIsNotNone(pickle.dumps(point))
-
-
-def check_all():
- return check_solr() and check_cass() and check_edge()
-
-
-def check_solr():
- # TODO eventually this might do something.
- return False
-
-
-def check_cass():
- # TODO eventually this might do something.
- return False
-
-
-def check_edge():
- # TODO eventually this might do something.
- return False
-
-
-@unittest.skipUnless(check_all(),
- "These tests require local instances of Solr, Cassandra, and Edge to be running.")
-class TestMatchup(unittest.TestCase):
- def setUp(self):
- from os import environ
- environ['PYSPARK_DRIVER_PYTHON'] = '/Users/greguska/anaconda/envs/nexus-analysis/bin/python2.7'
- environ['PYSPARK_PYTHON'] = '/Users/greguska/anaconda/envs/nexus-analysis/bin/python2.7'
- environ['SPARK_HOME'] = '/Users/greguska/sandbox/spark-2.0.0-bin-hadoop2.7'
-
- def test_mur_match(self):
- from shapely.wkt import loads
- from nexustiles.nexustiles import NexusTileService
-
- polygon = loads("POLYGON((-34.98 29.54, -30.1 29.54, -30.1 31.00, -34.98 31.00, -34.98 29.54))")
- primary_ds = "JPL-L4_GHRSST-SSTfnd-MUR-GLOB-v02.0-fv04.1"
- matchup_ds = "spurs"
- parameter = "sst"
- start_time = 1350259200 # 2012-10-15T00:00:00Z
- end_time = 1350345600 # 2012-10-16T00:00:00Z
- time_tolerance = 86400
- depth_tolerance = 5.0
- radius_tolerance = 1500.0
- platforms = "1,2,3,4,5,6,7,8,9"
-
- tile_service = NexusTileService()
- tile_ids = [tile.tile_id for tile in
- tile_service.find_tiles_in_polygon(polygon, primary_ds, start_time, end_time, fetch_data=False,
- fl='id')]
- result = spark_matchup_driver(tile_ids, wkt.dumps(polygon), primary_ds, matchup_ds, parameter, time_tolerance,
- depth_tolerance, radius_tolerance, platforms)
- for k, v in result.iteritems():
- print "primary: %s\n\tmatches:\n\t\t%s" % (
- "lon: %s, lat: %s, time: %s, sst: %s" % (k.longitude, k.latitude, k.time, k.sst),
- '\n\t\t'.join(
- ["lon: %s, lat: %s, time: %s, sst: %s" % (i.longitude, i.latitude, i.time, i.sst) for i in v]))
-
- def test_smap_match(self):
- from shapely.wkt import loads
- from nexustiles.nexustiles import NexusTileService
-
- polygon = loads("POLYGON((-34.98 29.54, -30.1 29.54, -30.1 31.00, -34.98 31.00, -34.98 29.54))")
- primary_ds = "SMAP_L2B_SSS"
- matchup_ds = "spurs"
- parameter = "sss"
- start_time = 1350259200 # 2012-10-15T00:00:00Z
- end_time = 1350345600 # 2012-10-16T00:00:00Z
- time_tolerance = 86400
- depth_tolerance = 5.0
- radius_tolerance = 1500.0
- platforms = "1,2,3,4,5,6,7,8,9"
-
- tile_service = NexusTileService()
- tile_ids = [tile.tile_id for tile in
- tile_service.find_tiles_in_polygon(polygon, primary_ds, start_time, end_time, fetch_data=False,
- fl='id')]
- result = spark_matchup_driver(tile_ids, wkt.dumps(polygon), primary_ds, matchup_ds, parameter, time_tolerance,
- depth_tolerance, radius_tolerance, platforms)
- for k, v in result.iteritems():
- print "primary: %s\n\tmatches:\n\t\t%s" % (
- "lon: %s, lat: %s, time: %s, sst: %s" % (k.longitude, k.latitude, k.time, k.sst),
- '\n\t\t'.join(
- ["lon: %s, lat: %s, time: %s, sst: %s" % (i.longitude, i.latitude, i.time, i.sst) for i in v]))
-
- def test_ascatb_match(self):
- from shapely.wkt import loads
- from nexustiles.nexustiles import NexusTileService
-
- polygon = loads("POLYGON((-34.98 29.54, -30.1 29.54, -30.1 31.00, -34.98 31.00, -34.98 29.54))")
- primary_ds = "ASCATB-L2-Coastal"
- matchup_ds = "spurs"
- parameter = "wind"
- start_time = 1351468800 # 2012-10-29T00:00:00Z
- end_time = 1351555200 # 2012-10-30T00:00:00Z
- time_tolerance = 86400
- depth_tolerance = 5.0
- radius_tolerance = 110000.0 # 110 km
- platforms = "1,2,3,4,5,6,7,8,9"
-
- tile_service = NexusTileService()
- tile_ids = [tile.tile_id for tile in
- tile_service.find_tiles_in_polygon(polygon, primary_ds, start_time, end_time, fetch_data=False,
- fl='id')]
- result = spark_matchup_driver(tile_ids, wkt.dumps(polygon), primary_ds, matchup_ds, parameter, time_tolerance,
- depth_tolerance, radius_tolerance, platforms)
- for k, v in result.iteritems():
- print "primary: %s\n\tmatches:\n\t\t%s" % (
- "lon: %s, lat: %s, time: %s, wind u,v: %s,%s" % (k.longitude, k.latitude, k.time, k.wind_u, k.wind_v),
- '\n\t\t'.join(
- ["lon: %s, lat: %s, time: %s, wind u,v: %s,%s" % (
- i.longitude, i.latitude, i.time, i.wind_u, i.wind_v) for i in v]))
diff --git a/analysis/tests/algorithms_spark/__init__.py b/analysis/tests/algorithms_spark/__init__.py
deleted file mode 100644
index 0707368..0000000
--- a/analysis/tests/algorithms_spark/__init__.py
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
diff --git a/analysis/webservice/algorithms/Capabilities.py b/analysis/webservice/algorithms/Capabilities.py
index f507587..fa85a7c 100644
--- a/analysis/webservice/algorithms/Capabilities.py
+++ b/analysis/webservice/algorithms/Capabilities.py
@@ -29,9 +29,6 @@ class CapabilitiesListCalcHandlerImpl(NexusCalcHandler):
params = {}
singleton = True
- def __init__(self):
- NexusCalcHandler.__init__(self)
-
def calc(self, computeOptions, **args):
capabilities = []
diff --git a/analysis/webservice/algorithms/CorrelationMap.py b/analysis/webservice/algorithms/CorrelationMap.py
index 1726412..1d8a0ad 100644
--- a/analysis/webservice/algorithms/CorrelationMap.py
+++ b/analysis/webservice/algorithms/CorrelationMap.py
@@ -41,9 +41,6 @@ class LongitudeLatitudeMapCalcHandlerImpl(NexusCalcHandler):
}
singleton = True
- def __init__(self):
- NexusCalcHandler.__init__(self)
-
def calc(self, computeOptions, **args):
minLat = computeOptions.get_min_lat()
maxLat = computeOptions.get_max_lat()
diff --git a/analysis/webservice/algorithms/DailyDifferenceAverage.py b/analysis/webservice/algorithms/DailyDifferenceAverage.py
index 1b4d642..0ffd83b 100644
--- a/analysis/webservice/algorithms/DailyDifferenceAverage.py
+++ b/analysis/webservice/algorithms/DailyDifferenceAverage.py
@@ -80,9 +80,6 @@ class DailyDifferenceAverageImpl(NexusCalcHandler):
}
singleton = True
- def __init__(self):
- NexusCalcHandler.__init__(self, skipCassandra=True)
-
def calc(self, request, **args):
min_lat, max_lat, min_lon, max_lon = request.get_min_lat(), request.get_max_lat(), request.get_min_lon(), request.get_max_lon()
dataset1 = request.get_argument("ds1", None)
diff --git a/analysis/webservice/algorithms/DataInBoundsSearch.py b/analysis/webservice/algorithms/DataInBoundsSearch.py
index 2da6891..fa69416 100644
--- a/analysis/webservice/algorithms/DataInBoundsSearch.py
+++ b/analysis/webservice/algorithms/DataInBoundsSearch.py
@@ -14,7 +14,6 @@
# limitations under the License.
-import logging
from datetime import datetime
from pytz import timezone
@@ -67,13 +66,8 @@ class DataInBoundsSearchCalcHandlerImpl(NexusCalcHandler):
}
singleton = True
- def __init__(self):
- NexusCalcHandler.__init__(self)
- self.log = logging.getLogger(__name__)
-
def parse_arguments(self, request):
# Parse input arguments
- self.log.debug("Parsing arguments")
try:
ds = request.get_dataset()[0]
diff --git a/analysis/webservice/algorithms/DataSeriesList.py b/analysis/webservice/algorithms/DataSeriesList.py
index 16736b2..e9275ed 100644
--- a/analysis/webservice/algorithms/DataSeriesList.py
+++ b/analysis/webservice/algorithms/DataSeriesList.py
@@ -20,6 +20,10 @@ from webservice.algorithms.NexusCalcHandler import NexusCalcHandler
from webservice.NexusHandler import nexus_handler
from webservice.webmodel import cached
+import logging
+
+
+logger = logging.getLogger(__name__)
@nexus_handler
class DataSeriesListCalcHandlerImpl(NexusCalcHandler):
@@ -28,9 +32,6 @@ class DataSeriesListCalcHandlerImpl(NexusCalcHandler):
description = "Lists datasets currently available for analysis"
params = {}
- def __init__(self):
- NexusCalcHandler.__init__(self, skipCassandra=True)
-
@cached(ttl=(60 * 60 * 1000)) # 1 hour cached
def calc(self, computeOptions, **args):
class SimpleResult(object):
diff --git a/analysis/webservice/algorithms/DelayTest.py b/analysis/webservice/algorithms/DelayTest.py
index e2c1b30..de56f56 100644
--- a/analysis/webservice/algorithms/DelayTest.py
+++ b/analysis/webservice/algorithms/DelayTest.py
@@ -28,9 +28,6 @@ class DelayCalcHandlerImpl(NexusCalcHandler):
params = {}
singleton = True
- def __init__(self):
- NexusCalcHandler.__init__(self)
-
def calc(self, computeOptions, **args):
time.sleep(10)
diff --git a/analysis/webservice/algorithms/ErrorTosserTest.py b/analysis/webservice/algorithms/ErrorTosserTest.py
index dc4d617..0100552 100644
--- a/analysis/webservice/algorithms/ErrorTosserTest.py
+++ b/analysis/webservice/algorithms/ErrorTosserTest.py
@@ -26,9 +26,6 @@ class ErrorTosserCalcHandler(NexusCalcHandler):
params = {}
singleton = True
- def __init__(self):
- NexusCalcHandler.__init__(self)
-
def calc(self, computeOptions, **args):
a = 100 / 0.0
# raise Exception("I'm Mad!")
diff --git a/analysis/webservice/algorithms/Heartbeat.py b/analysis/webservice/algorithms/Heartbeat.py
index ae7fcee..bc1f50f 100644
--- a/analysis/webservice/algorithms/Heartbeat.py
+++ b/analysis/webservice/algorithms/Heartbeat.py
@@ -28,9 +28,6 @@ class HeartbeatCalcHandlerImpl(NexusCalcHandler):
params = {}
singleton = True
- def __init__(self):
- NexusCalcHandler.__init__(self, skipCassandra=True)
-
def calc(self, computeOptions, **args):
solrOnline = self._get_tile_service().pingSolr()
diff --git a/analysis/webservice/algorithms/HofMoeller.py b/analysis/webservice/algorithms/HofMoeller.py
index 563ea3d..60252ab 100644
--- a/analysis/webservice/algorithms/HofMoeller.py
+++ b/analysis/webservice/algorithms/HofMoeller.py
@@ -39,6 +39,9 @@ LONGITUDE = 1
if not matplotlib.get_backend():
matplotlib.use('Agg')
+logger = logging.getLogger(__name__)
+
+
class LongitudeHofMoellerCalculator(object):
def longitude_time_hofmoeller_stats(self, tile, index):
stat = {
@@ -93,9 +96,6 @@ class LatitudeHofMoellerCalculator(object):
class BaseHoffMoellerCalcHandlerImpl(NexusCalcHandler):
- def __init__(self):
- NexusCalcHandler.__init__(self)
- self.log = logging.getLogger(__name__)
def applyDeseasonToHofMoellerByField(self, results, pivot="lats", field="avg", append=True):
shape = (len(results), len(results[0][pivot]))
@@ -168,7 +168,7 @@ class LatitudeTimeHoffMoellerHandlerImpl(BaseHoffMoellerCalcHandlerImpl):
result = done_queue.get()
try:
error_str = result['error']
- self.log.error(error_str)
+ logger.error(error_str)
raise NexusProcessingException(reason="Error calculating latitude_time_hofmoeller_stats.")
except KeyError:
pass
@@ -234,7 +234,7 @@ class LongitudeTimeHoffMoellerHandlerImpl(BaseHoffMoellerCalcHandlerImpl):
result = done_queue.get()
try:
error_str = result['error']
- self.log.error(error_str)
+ logger.error(error_str)
raise NexusProcessingException(reason="Error calculating longitude_time_hofmoeller_stats.")
except KeyError:
pass
diff --git a/analysis/webservice/algorithms/LongitudeLatitudeMap.py b/analysis/webservice/algorithms/LongitudeLatitudeMap.py
index 3f0467a..031d893 100644
--- a/analysis/webservice/algorithms/LongitudeLatitudeMap.py
+++ b/analysis/webservice/algorithms/LongitudeLatitudeMap.py
@@ -14,7 +14,6 @@
# limitations under the License.
-import logging
import math
from datetime import datetime
@@ -74,13 +73,8 @@ class LongitudeLatitudeMapCalcHandlerImpl(NexusCalcHandler):
}
singleton = True
- def __init__(self):
- NexusCalcHandler.__init__(self, skipCassandra=True)
- self.log = logging.getLogger(__name__)
-
def parse_arguments(self, request):
# Parse input arguments
- self.log.debug("Parsing arguments")
try:
ds = request.get_dataset()[0]
except:
diff --git a/analysis/webservice/algorithms/StandardDeviationSearch.py b/analysis/webservice/algorithms/StandardDeviationSearch.py
index 231c687..1975d2d 100644
--- a/analysis/webservice/algorithms/StandardDeviationSearch.py
+++ b/analysis/webservice/algorithms/StandardDeviationSearch.py
@@ -73,13 +73,8 @@ class StandardDeviationSearchCalcHandlerImpl(NexusCalcHandler):
}
singleton = True
- def __init__(self):
- NexusCalcHandler.__init__(self)
- self.log = logging.getLogger(__name__)
-
def parse_arguments(self, request):
# Parse input arguments
- self.log.debug("Parsing arguments")
try:
ds = request.get_dataset()[0]
except:
diff --git a/analysis/webservice/algorithms/TileSearch.py b/analysis/webservice/algorithms/TileSearch.py
index a3758bc..321d94f 100644
--- a/analysis/webservice/algorithms/TileSearch.py
+++ b/analysis/webservice/algorithms/TileSearch.py
@@ -62,9 +62,6 @@ class ChunkSearchCalcHandlerImpl(NexusCalcHandler):
}
}
- def __init__(self):
- NexusCalcHandler.__init__(self, skipCassandra=True)
-
def calc(self, computeOptions, **args):
minLat = computeOptions.get_min_lat()
maxLat = computeOptions.get_max_lat()
diff --git a/analysis/webservice/algorithms/TimeAvgMap.py b/analysis/webservice/algorithms/TimeAvgMap.py
index 3a609c5..93a9a00 100644
--- a/analysis/webservice/algorithms/TimeAvgMap.py
+++ b/analysis/webservice/algorithms/TimeAvgMap.py
@@ -37,9 +37,6 @@ class TimeAvgMapCalcHandlerImpl(NexusCalcHandler):
params = DEFAULT_PARAMETERS_SPEC
singleton = True
- def __init__(self):
- NexusCalcHandler.__init__(self, skipCassandra=False)
-
def _find_native_resolution(self):
# Get a quick set of tiles (1 degree at center of box) at 1 time stamp
midLat = (self._minLat + self._maxLat) / 2
diff --git a/analysis/webservice/algorithms/TimeSeries.py b/analysis/webservice/algorithms/TimeSeries.py
index 85613d9..b1d0675 100644
--- a/analysis/webservice/algorithms/TimeSeries.py
+++ b/analysis/webservice/algorithms/TimeSeries.py
@@ -41,6 +41,7 @@ SENTINEL = 'STOP'
EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
+logger = logging.getLogger(__name__)
@nexus_handler
class TimeSeriesCalcHandlerImpl(NexusCalcHandler):
@@ -84,13 +85,8 @@ class TimeSeriesCalcHandlerImpl(NexusCalcHandler):
}
singleton = True
- def __init__(self):
- NexusCalcHandler.__init__(self)
- self.log = logging.getLogger(__name__)
-
def parse_arguments(self, request):
# Parse input arguments
- self.log.debug("Parsing arguments")
try:
ds = request.get_dataset()
@@ -185,7 +181,7 @@ class TimeSeriesCalcHandlerImpl(NexusCalcHandler):
except Exception:
stats = {}
tb = traceback.format_exc()
- self.log.warn("Error when calculating comparison stats:\n%s" % tb)
+ logger.warn("Error when calculating comparison stats:\n%s" % tb)
else:
stats = {}
@@ -199,7 +195,7 @@ class TimeSeriesCalcHandlerImpl(NexusCalcHandler):
maxLon=bounding_polygon.bounds[2], ds=ds, startTime=start_seconds_from_epoch,
endTime=end_seconds_from_epoch)
- self.log.info("Merging results and calculating comparisons took %s" % (str(datetime.now() - the_time)))
+ logger.info("Merging results and calculating comparisons took %s" % (str(datetime.now() - the_time)))
return res
def getTimeSeriesStatsForBoxSingleDataSet(self, bounding_polygon, ds, start_seconds_from_epoch,
@@ -214,7 +210,7 @@ class TimeSeriesCalcHandlerImpl(NexusCalcHandler):
ds,
start_seconds_from_epoch,
end_seconds_from_epoch)
- self.log.info("Finding days in range took %s for dataset %s" % (str(datetime.now() - the_time), ds))
+ logger.info("Finding days in range took %s for dataset %s" % (str(datetime.now() - the_time), ds))
if len(daysinrange) == 0:
raise NoDataException(reason="No data found for selected timeframe")
@@ -248,7 +244,7 @@ class TimeSeriesCalcHandlerImpl(NexusCalcHandler):
result = done_queue.get()
try:
error_str = result['error']
- self.log.error(error_str)
+ logger.error(error_str)
raise NexusProcessingException(reason="Error calculating average by day.")
except KeyError:
pass
@@ -259,7 +255,7 @@ class TimeSeriesCalcHandlerImpl(NexusCalcHandler):
manager.shutdown()
results = sorted(results, key=lambda entry: entry["time"])
- self.log.info("Time series calculation took %s for dataset %s" % (str(datetime.now() - the_time), ds))
+ logger.info("Time series calculation took %s for dataset %s" % (str(datetime.now() - the_time), ds))
if apply_seasonal_cycle_filter:
the_time = datetime.now()
@@ -272,7 +268,7 @@ class TimeSeriesCalcHandlerImpl(NexusCalcHandler):
result['meanSeasonal'] = seasonal_mean
result['minSeasonal'] = seasonal_min
result['maxSeasonal'] = seasonal_max
- self.log.info(
+ logger.info(
"Seasonal calculation took %s for dataset %s" % (str(datetime.now() - the_time), ds))
the_time = datetime.now()
@@ -291,9 +287,9 @@ class TimeSeriesCalcHandlerImpl(NexusCalcHandler):
except Exception as e:
# If it doesn't work log the error but ignore it
tb = traceback.format_exc()
- self.log.warn("Error calculating SeasonalLowPass filter:\n%s" % tb)
+ logger.warn("Error calculating SeasonalLowPass filter:\n%s" % tb)
- self.log.info(
+ logger.info(
"LowPass filter calculation took %s for dataset %s" % (str(datetime.now() - the_time), ds))
return results, {}
diff --git a/analysis/webservice/algorithms/TimeSeriesSolr.py b/analysis/webservice/algorithms/TimeSeriesSolr.py
index fbe4d43..49d75db 100644
--- a/analysis/webservice/algorithms/TimeSeriesSolr.py
+++ b/analysis/webservice/algorithms/TimeSeriesSolr.py
@@ -33,6 +33,7 @@ from webservice.webmodel import NexusResults, NexusProcessingException, NoDataEx
SENTINEL = 'STOP'
+logger = logging.getLogger(__name__)
@nexus_handler
class TimeSeriesCalcHandlerImpl(NexusCalcHandler):
@@ -42,10 +43,6 @@ class TimeSeriesCalcHandlerImpl(NexusCalcHandler):
params = DEFAULT_PARAMETERS_SPEC
singleton = True
- def __init__(self):
- NexusCalcHandler.__init__(self, skipCassandra=True)
- self.log = logging.getLogger(__name__)
-
def calc(self, computeOptions, **args):
"""
@@ -133,7 +130,7 @@ class TimeSeriesCalcHandlerImpl(NexusCalcHandler):
result = done_queue.get()
try:
error_str = result['error']
- self.log.error(error_str)
+ logger.error(error_str)
raise NexusProcessingException(reason="Error calculating average by day.")
except KeyError:
pass
diff --git a/analysis/webservice/algorithms/doms/BaseDomsHandler.py b/analysis/webservice/algorithms/doms/BaseDomsHandler.py
deleted file mode 100644
index d07f929..0000000
--- a/analysis/webservice/algorithms/doms/BaseDomsHandler.py
+++ /dev/null
@@ -1,635 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import StringIO
-import os
-import csv
-import json
-from datetime import datetime
-import time
-from decimal import Decimal
-
-import numpy as np
-from pytz import timezone, UTC
-
-import config
-import geo
-from webservice.algorithms.NexusCalcHandler import NexusCalcHandler as BaseHandler
-from webservice.webmodel import NexusResults
-
-EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
-ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
-
-try:
- from osgeo import gdal
- from osgeo.gdalnumeric import *
-except ImportError:
- import gdal
- from gdalnumeric import *
-
-from netCDF4 import Dataset
-import netCDF4
-import tempfile
-
-
-class BaseDomsQueryCalcHandler(BaseHandler):
- def __init__(self):
- BaseHandler.__init__(self)
-
- def getDataSourceByName(self, source):
- for s in config.ENDPOINTS:
- if s["name"] == source:
- return s
- return None
-
- def _does_datasource_exist(self, ds):
- for endpoint in config.ENDPOINTS:
- if endpoint["name"] == ds:
- return True
- return False
-
-
-class DomsEncoder(json.JSONEncoder):
- def __init__(self, **args):
- json.JSONEncoder.__init__(self, **args)
-
- def default(self, obj):
- # print 'MyEncoder.default() called'
- # print type(obj)
- if obj == np.nan:
- return None # hard code string for now
- elif isinstance(obj, datetime):
- return long((obj - EPOCH).total_seconds())
- elif isinstance(obj, Decimal):
- return str(obj)
- else:
- return json.JSONEncoder.default(self, obj)
-
-
-class DomsQueryResults(NexusResults):
- def __init__(self, results=None, args=None, bounds=None, count=None, details=None, computeOptions=None,
- executionId=None, status_code=200):
- NexusResults.__init__(self, results=results, meta=None, stats=None, computeOptions=computeOptions,
- status_code=status_code)
- self.__args = args
- self.__bounds = bounds
- self.__count = count
- self.__details = details
- self.__executionId = str(executionId)
-
- def toJson(self):
- bounds = self.__bounds.toMap() if self.__bounds is not None else {}
- return json.dumps(
- {"executionId": self.__executionId, "data": self.results(), "params": self.__args, "bounds": bounds,
- "count": self.__count, "details": self.__details}, indent=4, cls=DomsEncoder)
-
- def toCSV(self):
- return DomsCSVFormatter.create(self.__executionId, self.results(), self.__args, self.__details)
-
- def toNetCDF(self):
- return DomsNetCDFFormatter.create(self.__executionId, self.results(), self.__args, self.__details)
-
-
-class DomsCSVFormatter:
- @staticmethod
- def create(executionId, results, params, details):
-
- csv_mem_file = StringIO.StringIO()
- try:
- DomsCSVFormatter.__addConstants(csv_mem_file)
- DomsCSVFormatter.__addDynamicAttrs(csv_mem_file, executionId, results, params, details)
- csv.writer(csv_mem_file).writerow([])
-
- DomsCSVFormatter.__packValues(csv_mem_file, results, params)
-
- csv_out = csv_mem_file.getvalue()
- finally:
- csv_mem_file.close()
-
- return csv_out
-
- @staticmethod
- def __packValues(csv_mem_file, results, params):
-
- writer = csv.writer(csv_mem_file)
-
- headers = [
- # Primary
- "id", "source", "lon (degrees_east)", "lat (degrees_north)", "time", "platform",
- "sea_surface_salinity (1e-3)", "sea_surface_temperature (degree_C)", "wind_speed (m s-1)", "wind_direction",
- "wind_u (m s-1)", "wind_v (m s-1)",
- # Match
- "id", "source", "lon (degrees_east)", "lat (degrees_north)", "time", "platform",
- "depth (m)", "sea_water_salinity (1e-3)",
- "sea_water_temperature (degree_C)", "wind_speed (m s-1)",
- "wind_direction", "wind_u (m s-1)", "wind_v (m s-1)"
- ]
-
- writer.writerow(headers)
-
- #
- # Only include the depth variable related to the match-up parameter. If the match-up parameter
- # is not sss or sst then do not include any depth data, just fill values.
- #
- if params["parameter"] == "sss":
- depth = "sea_water_salinity_depth"
- elif params["parameter"] == "sst":
- depth = "sea_water_temperature_depth"
- else:
- depth = "NO_DEPTH"
-
- for primaryValue in results:
- for matchup in primaryValue["matches"]:
- row = [
- # Primary
- primaryValue["id"], primaryValue["source"], str(primaryValue["x"]), str(primaryValue["y"]),
- primaryValue["time"].strftime(ISO_8601), primaryValue["platform"],
- primaryValue.get("sea_water_salinity", ""), primaryValue.get("sea_water_temperature", ""),
- primaryValue.get("wind_speed", ""), primaryValue.get("wind_direction", ""),
- primaryValue.get("wind_u", ""), primaryValue.get("wind_v", ""),
-
- # Matchup
- matchup["id"], matchup["source"], matchup["x"], matchup["y"],
- matchup["time"].strftime(ISO_8601), matchup["platform"],
- matchup.get(depth, ""), matchup.get("sea_water_salinity", ""),
- matchup.get("sea_water_temperature", ""),
- matchup.get("wind_speed", ""), matchup.get("wind_direction", ""),
- matchup.get("wind_u", ""), matchup.get("wind_v", ""),
- ]
- writer.writerow(row)
-
- @staticmethod
- def __addConstants(csvfile):
-
- global_attrs = [
- {"Global Attribute": "product_version", "Value": "1.0"},
- {"Global Attribute": "Conventions", "Value": "CF-1.6, ACDD-1.3"},
- {"Global Attribute": "title", "Value": "DOMS satellite-insitu machup output file"},
- {"Global Attribute": "history",
- "Value": "Processing_Version = V1.0, Software_Name = DOMS, Software_Version = 1.03"},
- {"Global Attribute": "institution", "Value": "JPL, FSU, NCAR"},
- {"Global Attribute": "source", "Value": "doms.jpl.nasa.gov"},
- {"Global Attribute": "standard_name_vocabulary",
- "Value": "CF Standard Name Table v27, BODC controlled vocabulary"},
- {"Global Attribute": "cdm_data_type", "Value": "Point/Profile, Swath/Grid"},
- {"Global Attribute": "processing_level", "Value": "4"},
- {"Global Attribute": "project", "Value": "Distributed Oceanographic Matchup System (DOMS)"},
- {"Global Attribute": "keywords_vocabulary",
- "Value": "NASA Global Change Master Directory (GCMD) Science Keywords"},
- # TODO What should the keywords be?
- {"Global Attribute": "keywords", "Value": "SATELLITES, OCEAN PLATFORMS, SHIPS, BUOYS, MOORINGS, AUVS, ROV, "
- "NASA/JPL/PODAAC, FSU/COAPS, UCAR/NCAR, SALINITY, "
- "SEA SURFACE TEMPERATURE, SURFACE WINDS"},
- {"Global Attribute": "creator_name", "Value": "NASA PO.DAAC"},
- {"Global Attribute": "creator_email", "Value": "podaac@podaac.jpl.nasa.gov"},
- {"Global Attribute": "creator_url", "Value": "https://podaac.jpl.nasa.gov/"},
- {"Global Attribute": "publisher_name", "Value": "NASA PO.DAAC"},
- {"Global Attribute": "publisher_email", "Value": "podaac@podaac.jpl.nasa.gov"},
- {"Global Attribute": "publisher_url", "Value": "https://podaac.jpl.nasa.gov"},
- {"Global Attribute": "acknowledgment", "Value": "DOMS is a NASA/AIST-funded project. NRA NNH14ZDA001N."},
- ]
-
- writer = csv.DictWriter(csvfile, sorted(next(iter(global_attrs)).keys()))
-
- writer.writerows(global_attrs)
-
- @staticmethod
- def __addDynamicAttrs(csvfile, executionId, results, params, details):
-
- platforms = set()
- for primaryValue in results:
- platforms.add(primaryValue['platform'])
- for match in primaryValue['matches']:
- platforms.add(match['platform'])
-
- # insituDatasets = params["matchup"].split(",")
- insituDatasets = params["matchup"]
- insituLinks = set()
- for insitu in insituDatasets:
- insituLinks.add(config.METADATA_LINKS[insitu])
-
-
- global_attrs = [
- {"Global Attribute": "Platform", "Value": ', '.join(platforms)},
- {"Global Attribute": "time_coverage_start",
- "Value": params["startTime"].strftime(ISO_8601)},
- {"Global Attribute": "time_coverage_end",
- "Value": params["endTime"].strftime(ISO_8601)},
- {"Global Attribute": "time_coverage_resolution", "Value": "point"},
-
- {"Global Attribute": "geospatial_lon_min", "Value": params["bbox"].split(',')[0]},
- {"Global Attribute": "geospatial_lat_min", "Value": params["bbox"].split(',')[1]},
- {"Global Attribute": "geospatial_lon_max", "Value": params["bbox"].split(',')[2]},
- {"Global Attribute": "geospatial_lat_max", "Value": params["bbox"].split(',')[3]},
- {"Global Attribute": "geospatial_lat_resolution", "Value": "point"},
- {"Global Attribute": "geospatial_lon_resolution", "Value": "point"},
- {"Global Attribute": "geospatial_lat_units", "Value": "degrees_north"},
- {"Global Attribute": "geospatial_lon_units", "Value": "degrees_east"},
-
- {"Global Attribute": "geospatial_vertical_min", "Value": params["depthMin"]},
- {"Global Attribute": "geospatial_vertical_max", "Value": params["depthMax"]},
- {"Global Attribute": "geospatial_vertical_units", "Value": "m"},
- {"Global Attribute": "geospatial_vertical_resolution", "Value": "point"},
- {"Global Attribute": "geospatial_vertical_positive", "Value": "down"},
-
- {"Global Attribute": "DOMS_matchID", "Value": executionId},
- {"Global Attribute": "DOMS_TimeWindow", "Value": params["timeTolerance"] / 60 / 60},
- {"Global Attribute": "DOMS_TimeWindow_Units", "Value": "hours"},
-
- {"Global Attribute": "DOMS_platforms", "Value": params["platforms"]},
- {"Global Attribute": "DOMS_SearchRadius", "Value": params["radiusTolerance"]},
- {"Global Attribute": "DOMS_SearchRadius_Units", "Value": "m"},
-
- {"Global Attribute": "DOMS_DatasetMetadata", "Value": ', '.join(insituLinks)},
- {"Global Attribute": "DOMS_primary", "Value": params["primary"]},
- {"Global Attribute": "DOMS_match_up", "Value": params["matchup"]},
- {"Global Attribute": "DOMS_ParameterPrimary", "Value": params.get("parameter", "")},
-
- {"Global Attribute": "DOMS_time_to_complete", "Value": details["timeToComplete"]},
- {"Global Attribute": "DOMS_time_to_complete_units", "Value": "seconds"},
- {"Global Attribute": "DOMS_num_matchup_matched", "Value": details["numInSituMatched"]},
- {"Global Attribute": "DOMS_num_primary_matched", "Value": details["numGriddedMatched"]},
-
- {"Global Attribute": "date_modified", "Value": datetime.utcnow().replace(tzinfo=UTC).strftime(ISO_8601)},
- {"Global Attribute": "date_created", "Value": datetime.utcnow().replace(tzinfo=UTC).strftime(ISO_8601)},
-
- {"Global Attribute": "URI_Matchup", "Value": "http://{webservice}/domsresults?id=" + executionId + "&output=CSV"},
- ]
-
- writer = csv.DictWriter(csvfile, sorted(next(iter(global_attrs)).keys()))
-
- writer.writerows(global_attrs)
-
-
-class DomsNetCDFFormatter:
- @staticmethod
- def create(executionId, results, params, details):
-
- t = tempfile.mkstemp(prefix="doms_", suffix=".nc")
- tempFileName = t[1]
-
- dataset = Dataset(tempFileName, "w", format="NETCDF4")
- dataset.DOMS_matchID = executionId
- DomsNetCDFFormatter.__addNetCDFConstants(dataset)
-
- dataset.date_modified = datetime.utcnow().replace(tzinfo=UTC).strftime(ISO_8601)
- dataset.date_created = datetime.utcnow().replace(tzinfo=UTC).strftime(ISO_8601)
- dataset.time_coverage_start = params["startTime"].strftime(ISO_8601)
- dataset.time_coverage_end = params["endTime"].strftime(ISO_8601)
- dataset.time_coverage_resolution = "point"
- dataset.DOMS_match_up = params["matchup"]
- dataset.DOMS_num_matchup_matched = details["numInSituMatched"]
- dataset.DOMS_num_primary_matched = details["numGriddedMatched"]
-
- bbox = geo.BoundingBox(asString=params["bbox"])
- dataset.geospatial_lat_max = bbox.north
- dataset.geospatial_lat_min = bbox.south
- dataset.geospatial_lon_max = bbox.east
- dataset.geospatial_lon_min = bbox.west
- dataset.geospatial_lat_resolution = "point"
- dataset.geospatial_lon_resolution = "point"
- dataset.geospatial_lat_units = "degrees_north"
- dataset.geospatial_lon_units = "degrees_east"
- dataset.geospatial_vertical_min = float(params["depthMin"])
- dataset.geospatial_vertical_max = float(params["depthMax"])
- dataset.geospatial_vertical_units = "m"
- dataset.geospatial_vertical_resolution = "point"
- dataset.geospatial_vertical_positive = "down"
-
- dataset.DOMS_TimeWindow = params["timeTolerance"] / 60 / 60
- dataset.DOMS_TimeWindow_Units = "hours"
- dataset.DOMS_SearchRadius = float(params["radiusTolerance"])
- dataset.DOMS_SearchRadius_Units = "m"
- # dataset.URI_Subset = "http://webservice subsetting query request"
- dataset.URI_Matchup = "http://{webservice}/domsresults?id=" + executionId + "&output=NETCDF"
- dataset.DOMS_ParameterPrimary = params["parameter"] if "parameter" in params else ""
- dataset.DOMS_platforms = params["platforms"]
- dataset.DOMS_primary = params["primary"]
- dataset.DOMS_time_to_complete = details["timeToComplete"]
- dataset.DOMS_time_to_complete_units = "seconds"
-
- insituDatasets = params["matchup"]
- insituLinks = set()
- for insitu in insituDatasets:
- insituLinks.add(config.METADATA_LINKS[insitu])
- dataset.DOMS_DatasetMetadata = ', '.join(insituLinks)
-
- platforms = set()
- for primaryValue in results:
- platforms.add(primaryValue['platform'])
- for match in primaryValue['matches']:
- platforms.add(match['platform'])
- dataset.platform = ', '.join(platforms)
-
- satellite_group_name = "SatelliteData"
- insitu_group_name = "InsituData"
-
- #Create Satellite group, variables, and attributes
- satelliteGroup = dataset.createGroup(satellite_group_name)
- satelliteWriter = DomsNetCDFValueWriter(satelliteGroup, params["parameter"])
-
- # Create InSitu group, variables, and attributes
- insituGroup = dataset.createGroup(insitu_group_name)
- insituWriter = DomsNetCDFValueWriter(insituGroup, params["parameter"])
-
- # Add data to Insitu and Satellite groups, generate array of match ID pairs
- matches = DomsNetCDFFormatter.__writeResults(results, satelliteWriter, insituWriter)
- dataset.createDimension("MatchedRecords", size=None)
- dataset.createDimension("MatchedGroups", size=2)
- matchArray = dataset.createVariable("matchIDs", "f4", ("MatchedRecords", "MatchedGroups"))
- matchArray[:] = matches
-
- dataset.close()
- f = open(tempFileName, "rb")
- data = f.read()
- f.close()
- os.unlink(tempFileName)
- return data
-
- @staticmethod
- def __addNetCDFConstants(dataset):
- dataset.product_version = "1.0"
- dataset.Conventions = "CF-1.6, ACDD-1.3"
- dataset.title = "DOMS satellite-insitu machup output file"
- dataset.history = "Processing_Version = V1.0, Software_Name = DOMS, Software_Version = 1.03"
- dataset.institution = "JPL, FSU, NCAR"
- dataset.source = "doms.jpl.nasa.gov"
- dataset.standard_name_vocabulary = "CF Standard Name Table v27", "BODC controlled vocabulary"
- dataset.cdm_data_type = "Point/Profile, Swath/Grid"
- dataset.processing_level = "4"
- dataset.project = "Distributed Oceanographic Matchup System (DOMS)"
- dataset.keywords_vocabulary = "NASA Global Change Master Directory (GCMD) Science Keywords"
- dataset.keywords = "SATELLITES, OCEAN PLATFORMS, SHIPS, BUOYS, MOORINGS, AUVS, ROV, NASA/JPL/PODAAC, " \
- "FSU/COAPS, UCAR/NCAR, SALINITY, SEA SURFACE TEMPERATURE, SURFACE WINDS"
- dataset.creator_name = "NASA PO.DAAC"
- dataset.creator_email = "podaac@podaac.jpl.nasa.gov"
- dataset.creator_url = "https://podaac.jpl.nasa.gov/"
- dataset.publisher_name = "NASA PO.DAAC"
- dataset.publisher_email = "podaac@podaac.jpl.nasa.gov"
- dataset.publisher_url = "https://podaac.jpl.nasa.gov"
- dataset.acknowledgment = "DOMS is a NASA/AIST-funded project. NRA NNH14ZDA001N."
-
- @staticmethod
- def __writeResults(results, satelliteWriter, insituWriter):
- ids = {}
- matches = []
- insituIndex = 0
-
- #
- # Loop through all of the results, add each satellite data point to the array
- #
- for r in range(0, len(results)):
- result = results[r]
- satelliteWriter.addData(result)
-
- # Add each match only if it is not already in the array of in situ points
- for match in result["matches"]:
- if match["id"] not in ids:
- ids[match["id"]] = insituIndex
- insituIndex += 1
- insituWriter.addData(match)
-
- # Append an index pait of (satellite, in situ) to the array of matches
- matches.append((r, ids[match["id"]]))
-
- # Add data/write to the netCDF file
- satelliteWriter.writeGroup()
- insituWriter.writeGroup()
-
- return matches
-
-
-class DomsNetCDFValueWriter:
- def __init__(self, group, matchup_parameter):
- group.createDimension("dim", size=None)
- self.group = group
-
- self.lat = []
- self.lon = []
- self.time = []
- self.sea_water_salinity = []
- self.wind_speed = []
- self.wind_u = []
- self.wind_v = []
- self.wind_direction = []
- self.sea_water_temperature = []
- self.depth = []
-
- self.satellite_group_name = "SatelliteData"
- self.insitu_group_name = "InsituData"
-
- #
- # Only include the depth variable related to the match-up parameter. If the match-up parameter is
- # not sss or sst then do not include any depth data, just fill values.
- #
- if matchup_parameter == "sss":
- self.matchup_depth = "sea_water_salinity_depth"
- elif matchup_parameter == "sst":
- self.matchup_depth = "sea_water_temperature_depth"
- else:
- self.matchup_depth = "NO_DEPTH"
-
- def addData(self, value):
- self.lat.append(value.get("y", None))
- self.lon.append(value.get("x", None))
- self.time.append(time.mktime(value.get("time").timetuple()))
- self.sea_water_salinity.append(value.get("sea_water_salinity", None))
- self.wind_speed.append(value.get("wind_speed", None))
- self.wind_u.append(value.get("wind_u", None))
- self.wind_v.append(value.get("wind_v", None))
- self.wind_direction.append(value.get("wind_direction", None))
- self.sea_water_temperature.append(value.get("sea_water_temperature", None))
- self.depth.append(value.get(self.matchup_depth, None))
-
- def writeGroup(self):
- #
- # Create variables, enrich with attributes, and add data
- #
- lonVar = self.group.createVariable("lon", "f4", ("dim",), fill_value=-32767.0)
- latVar = self.group.createVariable("lat", "f4", ("dim",), fill_value=-32767.0)
- timeVar = self.group.createVariable("time", "f4", ("dim",), fill_value=-32767.0)
-
- self.__enrichLon(lonVar, min(self.lon), max(self.lon))
- self.__enrichLat(latVar, min(self.lat), max(self.lat))
- self.__enrichTime(timeVar)
-
- latVar[:] = self.lat
- lonVar[:] = self.lon
- timeVar[:] = self.time
-
- if self.sea_water_salinity.count(None) != len(self.sea_water_salinity):
- if self.group.name == self.satellite_group_name:
- sssVar = self.group.createVariable("SeaSurfaceSalinity", "f4", ("dim",), fill_value=-32767.0)
- self.__enrichSSSMeasurements(sssVar, min(self.sea_water_salinity), max(self.sea_water_salinity))
- else: # group.name == self.insitu_group_name
- sssVar = self.group.createVariable("SeaWaterSalinity", "f4", ("dim",), fill_value=-32767.0)
- self.__enrichSWSMeasurements(sssVar, min(self.sea_water_salinity), max(self.sea_water_salinity))
- sssVar[:] = self.sea_water_salinity
-
- if self.wind_speed.count(None) != len(self.wind_speed):
- windSpeedVar = self.group.createVariable("WindSpeed", "f4", ("dim",), fill_value=-32767.0)
- self.__enrichWindSpeed(windSpeedVar, self.__calcMin(self.wind_speed), max(self.wind_speed))
- windSpeedVar[:] = self.wind_speed
-
- if self.wind_u.count(None) != len(self.wind_u):
- windUVar = self.group.createVariable("WindU", "f4", ("dim",), fill_value=-32767.0)
- windUVar[:] = self.wind_u
- self.__enrichWindU(windUVar, self.__calcMin(self.wind_u), max(self.wind_u))
-
- if self.wind_v.count(None) != len(self.wind_v):
- windVVar = self.group.createVariable("WindV", "f4", ("dim",), fill_value=-32767.0)
- windVVar[:] = self.wind_v
- self.__enrichWindV(windVVar, self.__calcMin(self.wind_v), max(self.wind_v))
-
- if self.wind_direction.count(None) != len(self.wind_direction):
- windDirVar = self.group.createVariable("WindDirection", "f4", ("dim",), fill_value=-32767.0)
- windDirVar[:] = self.wind_direction
- self.__enrichWindDir(windDirVar)
-
- if self.sea_water_temperature.count(None) != len(self.sea_water_temperature):
- if self.group.name == self.satellite_group_name:
- tempVar = self.group.createVariable("SeaSurfaceTemp", "f4", ("dim",), fill_value=-32767.0)
- self.__enrichSurfaceTemp(tempVar, self.__calcMin(self.sea_water_temperature), max(self.sea_water_temperature))
- else:
- tempVar = self.group.createVariable("SeaWaterTemp", "f4", ("dim",), fill_value=-32767.0)
- self.__enrichWaterTemp(tempVar, self.__calcMin(self.sea_water_temperature), max(self.sea_water_temperature))
- tempVar[:] = self.sea_water_temperature
-
- if self.group.name == self.insitu_group_name:
- depthVar = self.group.createVariable("Depth", "f4", ("dim",), fill_value=-32767.0)
-
- if self.depth.count(None) != len(self.depth):
- self.__enrichDepth(depthVar, self.__calcMin(self.depth), max(self.depth))
- depthVar[:] = self.depth
- else:
- # If depth has no data, set all values to 0
- tempDepth = [0 for x in range(len(self.depth))]
- depthVar[:] = tempDepth
-
- #
- # Lists may include 'None" values, to calc min these must be filtered out
- #
- @staticmethod
- def __calcMin(var):
- return min(x for x in var if x is not None)
-
-
- #
- # Add attributes to each variable
- #
- @staticmethod
- def __enrichLon(var, var_min, var_max):
- var.long_name = "Longitude"
- var.standard_name = "longitude"
- var.axis = "X"
- var.units = "degrees_east"
- var.valid_min = var_min
- var.valid_max = var_max
-
- @staticmethod
- def __enrichLat(var, var_min, var_max):
- var.long_name = "Latitude"
- var.standard_name = "latitude"
- var.axis = "Y"
- var.units = "degrees_north"
- var.valid_min = var_min
- var.valid_max = var_max
-
- @staticmethod
- def __enrichTime(var):
- var.long_name = "Time"
- var.standard_name = "time"
- var.axis = "T"
- var.units = "seconds since 1970-01-01 00:00:00 0:00"
-
- @staticmethod
- def __enrichSSSMeasurements(var, var_min, var_max):
- var.long_name = "Sea surface salinity"
- var.standard_name = "sea_surface_salinity"
- var.units = "1e-3"
- var.valid_min = var_min
- var.valid_max = var_max
- var.coordinates = "lon lat time"
-
- @staticmethod
- def __enrichSWSMeasurements(var, var_min, var_max):
- var.long_name = "Sea water salinity"
- var.standard_name = "sea_water_salinity"
- var.units = "1e-3"
- var.valid_min = var_min
- var.valid_max = var_max
- var.coordinates = "lon lat depth time"
-
- @staticmethod
- def __enrichDepth(var, var_min, var_max):
- var.valid_min = var_min
- var.valid_max = var_max
- var.units = "m"
- var.long_name = "Depth"
- var.standard_name = "depth"
- var.axis = "Z"
- var.positive = "Down"
-
- @staticmethod
- def __enrichWindSpeed(var, var_min, var_max):
- var.long_name = "Wind speed"
- var.standard_name = "wind_speed"
- var.units = "m s-1"
- var.valid_min = var_min
- var.valid_max = var_max
- var.coordinates = "lon lat depth time"
-
- @staticmethod
- def __enrichWindU(var, var_min, var_max):
- var.long_name = "Eastward wind"
- var.standard_name = "eastward_wind"
- var.units = "m s-1"
- var.valid_min = var_min
- var.valid_max = var_max
- var.coordinates = "lon lat depth time"
-
- @staticmethod
- def __enrichWindV(var, var_min, var_max):
- var.long_name = "Northward wind"
- var.standard_name = "northward_wind"
- var.units = "m s-1"
- var.valid_min = var_min
- var.valid_max = var_max
- var.coordinates = "lon lat depth time"
-
- @staticmethod
- def __enrichWaterTemp(var, var_min, var_max):
- var.long_name = "Sea water temperature"
- var.standard_name = "sea_water_temperature"
- var.units = "degree_C"
- var.valid_min = var_min
- var.valid_max = var_max
- var.coordinates = "lon lat depth time"
-
- @staticmethod
- def __enrichSurfaceTemp(var, var_min, var_max):
- var.long_name = "Sea surface temperature"
- var.standard_name = "sea_surface_temperature"
- var.units = "degree_C"
- var.valid_min = var_min
- var.valid_max = var_max
- var.coordinates = "lon lat time"
-
- @staticmethod
- def __enrichWindDir(var):
- var.long_name = "Wind from direction"
- var.standard_name = "wind_from_direction"
- var.units = "degree"
- var.coordinates = "lon lat depth time"
diff --git a/analysis/webservice/algorithms/doms/DatasetListQuery.py b/analysis/webservice/algorithms/doms/DatasetListQuery.py
deleted file mode 100644
index ac7f263..0000000
--- a/analysis/webservice/algorithms/doms/DatasetListQuery.py
+++ /dev/null
@@ -1,116 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import json
-import traceback
-
-import requests
-
-import BaseDomsHandler
-import config
-import values
-from webservice.algorithms.NexusCalcHandler import NexusCalcHandler as BaseHandler
-from webservice.NexusHandler import nexus_handler
-from webservice.webmodel import cached
-
-
-@nexus_handler
-class DomsDatasetListQueryHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
- name = "DOMS Dataset Listing"
- path = "/domslist"
- description = ""
- params = {}
- singleton = True
-
- def __init__(self):
- BaseHandler.__init__(self)
-
- def getFacetsForInsituSource(self, source):
- url = source["url"]
-
- params = {
- "facet": "true",
- "stats": "true",
- "startIndex": 0,
- "itemsPerPage": 0
- }
- try:
- r = requests.get(url, params=params)
- results = json.loads(r.text)
-
- depths = None
- if "stats_fields" in results and "depth" in results["stats_fields"]:
- depths = results["stats_fields"]["depth"]
-
- for facet in results["facets"]:
- field = facet["field"]
- for value in facet["values"]:
- value["value"] = values.getDescByListNameAndId(field, int(value["value"]))
-
- return depths, results["facets"]
- except: # KMG: Don't eat the exception. Add better handling...
- traceback.print_exc()
- return None, None
-
- def getMetadataUrlForDataset(self, dataset):
- datasetSpec = config.getEndpointByName(dataset)
- if datasetSpec is not None:
- return datasetSpec["metadataUrl"]
- else:
-
- # KMG: NOT a good hack
- if dataset == "JPL-L4_GHRSST-SSTfnd-MUR-GLOB-v02.0-fv04.1" or dataset == "JPL-L4_GHRSST-SSTfnd-MUR-GLOB-v02.0-fv04.1_CLIM":
- dataset = "MUR-JPL-L4-GLOB-v4.1"
- elif dataset == "SMAP_L2B_SSS":
- dataset = "JPL_SMAP-SSS_L2_EVAL-V2"
- elif dataset == "AVHRR_OI_L4_GHRSST_NCEI" or dataset == "AVHRR_OI_L4_GHRSST_NCEI_CLIM":
- dataset = "AVHRR_OI-NCEI-L4-GLOB-v2.0"
-
- return "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=%s&format=umm-json" % dataset
-
- def getMetadataForSource(self, dataset):
- try:
- r = requests.get(self.getMetadataUrlForDataset(dataset))
- results = json.loads(r.text)
- return results
- except:
- return None
-
- @cached(ttl=(60 * 60 * 1000)) # 1 hour cached
- def calc(self, computeOptions, **args):
-
- satellitesList = self._get_tile_service().get_dataseries_list(simple=True)
-
- insituList = []
-
- for satellite in satellitesList:
- satellite["metadata"] = self.getMetadataForSource(satellite["shortName"])
-
- for insitu in config.ENDPOINTS:
- depths, facets = self.getFacetsForInsituSource(insitu)
- insituList.append({
- "name": insitu["name"],
- "endpoint": insitu["url"],
- "metadata": self.getMetadataForSource(insitu["name"]),
- "depths": depths,
- "facets": facets
- })
-
- values = {
- "satellite": satellitesList,
- "insitu": insituList
- }
-
- return BaseDomsHandler.DomsQueryResults(results=values)
diff --git a/analysis/webservice/algorithms/doms/DomsInitialization.py b/analysis/webservice/algorithms/doms/DomsInitialization.py
deleted file mode 100644
index 2d429ca..0000000
--- a/analysis/webservice/algorithms/doms/DomsInitialization.py
+++ /dev/null
@@ -1,164 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-
-import ConfigParser
-import logging
-
-import pkg_resources
-from cassandra.cluster import Cluster
-from cassandra.policies import TokenAwarePolicy, DCAwareRoundRobinPolicy, WhiteListRoundRobinPolicy
-
-from webservice.NexusHandler import nexus_initializer
-
-@nexus_initializer
-class DomsInitializer:
- def __init__(self):
- pass
-
- def init(self, config):
- log = logging.getLogger(__name__)
- log.info("*** STARTING DOMS INITIALIZATION ***")
-
- domsconfig = ConfigParser.SafeConfigParser()
- domsconfig.read(DomsInitializer._get_config_files('domsconfig.ini'))
-
- cassHost = domsconfig.get("cassandra", "host")
- cassPort = domsconfig.get("cassandra", "port")
- cassKeyspace = domsconfig.get("cassandra", "keyspace")
- cassDatacenter = domsconfig.get("cassandra", "local_datacenter")
- cassVersion = int(domsconfig.get("cassandra", "protocol_version"))
- cassPolicy = domsconfig.get("cassandra", "dc_policy")
-
- log.info("Cassandra Host(s): %s" % (cassHost))
- log.info("Cassandra Keyspace: %s" % (cassKeyspace))
- log.info("Cassandra Datacenter: %s" % (cassDatacenter))
- log.info("Cassandra Protocol Version: %s" % (cassVersion))
- log.info("Cassandra DC Policy: %s" % (cassPolicy))
-
- if cassPolicy == 'DCAwareRoundRobinPolicy':
- dc_policy = DCAwareRoundRobinPolicy(cassDatacenter)
- elif cassPolicy == 'WhiteListRoundRobinPolicy':
- dc_policy = WhiteListRoundRobinPolicy([cassHost])
- token_policy = TokenAwarePolicy(dc_policy)
-
- with Cluster([host for host in cassHost.split(',')], port=int(cassPort), load_balancing_policy=token_policy,
- protocol_version=cassVersion) as cluster:
- session = cluster.connect()
-
- self.createKeyspace(session, cassKeyspace)
- self.createTables(session)
-
- def createKeyspace(self, session, cassKeyspace):
- log = logging.getLogger(__name__)
- log.info("Verifying DOMS keyspace '%s'" % cassKeyspace)
- session.execute(
- "CREATE KEYSPACE IF NOT EXISTS %s WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };" % cassKeyspace)
- session.set_keyspace(cassKeyspace)
-
- def createTables(self, session):
- log = logging.getLogger(__name__)
- log.info("Verifying DOMS tables")
- self.createDomsExecutionsTable(session)
- self.createDomsParamsTable(session)
- self.createDomsDataTable(session)
- self.createDomsExecutionStatsTable(session)
-
- def createDomsExecutionsTable(self, session):
- log = logging.getLogger(__name__)
- log.info("Verifying doms_executions table")
- cql = """
- CREATE TABLE IF NOT EXISTS doms_executions (
- id uuid PRIMARY KEY,
- time_started timestamp,
- time_completed timestamp,
- user_email text
- );
- """
- session.execute(cql)
-
- def createDomsParamsTable(self, session):
- log = logging.getLogger(__name__)
- log.info("Verifying doms_params table")
- cql = """
- CREATE TABLE IF NOT EXISTS doms_params (
- execution_id uuid PRIMARY KEY,
- primary_dataset text,
- matchup_datasets text,
- depth_tolerance decimal,
- depth_min decimal,
- depth_max decimal,
- time_tolerance int,
- radius_tolerance decimal,
- start_time timestamp,
- end_time timestamp,
- platforms text,
- bounding_box text,
- parameter text
- );
- """
- session.execute(cql)
-
- def createDomsDataTable(self, session):
- log = logging.getLogger(__name__)
- log.info("Verifying doms_data table")
- cql = """
- CREATE TABLE IF NOT EXISTS doms_data (
- id uuid,
- execution_id uuid,
- value_id text,
- primary_value_id text,
- is_primary boolean,
- x decimal,
- y decimal,
- source_dataset text,
- measurement_time timestamp,
- platform text,
- device text,
- measurement_values map<text, decimal>,
- PRIMARY KEY (execution_id, is_primary, id)
- );
- """
- session.execute(cql)
-
- def createDomsExecutionStatsTable(self, session):
- log = logging.getLogger(__name__)
- log.info("Verifying doms_execuction_stats table")
- cql = """
- CREATE TABLE IF NOT EXISTS doms_execution_stats (
- execution_id uuid PRIMARY KEY,
- num_gridded_matched int,
- num_gridded_checked int,
- num_insitu_matched int,
- num_insitu_checked int,
- time_to_complete int
- );
- """
- session.execute(cql)
-
- @staticmethod
- def _get_config_files(filename):
- log = logging.getLogger(__name__)
- candidates = []
- extensions = ['.default', '']
- for extension in extensions:
- try:
- candidate = pkg_resources.resource_filename(__name__, filename + extension)
- candidates.append(candidate)
- except KeyError as ke:
- log.warning('configuration file {} not found'.format(filename + extension))
-
- return candidates
diff --git a/analysis/webservice/algorithms/doms/MatchupQuery.py b/analysis/webservice/algorithms/doms/MatchupQuery.py
deleted file mode 100644
index 57a0834..0000000
--- a/analysis/webservice/algorithms/doms/MatchupQuery.py
+++ /dev/null
@@ -1,452 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import math
-import uuid
-from datetime import datetime
-
-import numpy as np
-import utm
-from nexustiles.model.nexusmodel import get_approximate_value_for_lat_lon
-from scipy import spatial
-
-import BaseDomsHandler
-import ResultsStorage
-import datafetch
-import fetchedgeimpl
-import geo
-import workerthread
-from webservice.NexusHandler import nexus_handler
-
-
-@nexus_handler
-class CombinedDomsMatchupQueryHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
- name = "Experimental Combined DOMS In-Situ Matchup"
- path = "/domsmatchup"
- description = ""
- params = {}
- singleton = True
-
- def __init__(self):
- BaseDomsHandler.BaseDomsQueryCalcHandler.__init__(self)
-
- def fetchData(self, endpoints, startTime, endTime, bbox, depth_min, depth_max, platforms):
-
- boundsConstrainer = geo.BoundsConstrainer(asString=bbox)
- threads = []
- for endpoint in endpoints:
- thread = workerthread.WorkerThread(datafetch.fetchData,
- params=(endpoint, startTime, endTime, bbox, depth_min, depth_max))
- threads.append(thread)
- workerthread.wait(threads, startFirst=True, poll=0.01)
-
- data2 = []
- for thread in threads:
- data, bounds = thread.results
- data2 += data
- boundsConstrainer.testOtherConstrainer(bounds)
-
- return data2, boundsConstrainer
-
- def __parseDatetime(self, dtString):
- dt = datetime.strptime(dtString, "%Y-%m-%dT%H:%M:%SZ")
- epoch = datetime.utcfromtimestamp(0)
- time = (dt - epoch).total_seconds() * 1000.0
- return time
-
- def calc(self, computeOptions, **args):
- primary = computeOptions.get_argument("primary", None)
- matchup = computeOptions.get_argument("matchup", None)
- startTime = computeOptions.get_argument("s", None)
- endTime = computeOptions.get_argument("e", None)
- bbox = computeOptions.get_argument("b", None)
- timeTolerance = computeOptions.get_float_arg("tt")
- depth_min = computeOptions.get_float_arg("depthMin", default=None)
- depth_max = computeOptions.get_float_arg("depthMax", default=None)
- radiusTolerance = computeOptions.get_float_arg("rt")
- platforms = computeOptions.get_argument("platforms", None)
-
- if primary is None or len(primary) == 0:
- raise Exception("No primary dataset specified")
-
- if matchup is None or len(matchup) == 0:
- raise Exception("No matchup datasets specified")
-
- start = self._now()
-
- primarySpec = self.getDataSourceByName(primary)
- if primarySpec is None:
- raise Exception("Specified primary dataset not found using identifier '%s'" % primary)
-
- primaryData, bounds = self.fetchData([primarySpec], startTime, endTime, bbox, depth_min, depth_max, platforms)
-
- primaryContext = MatchupContext(primaryData)
-
- matchupIds = matchup.split(",")
-
- for matchupId in matchupIds:
- matchupSpec = self.getDataSourceByName(matchupId)
-
- if matchupSpec is not None: # Then it's in the in-situ configuration
- proc = InsituDatasetProcessor(primaryContext, matchupSpec, startTime, endTime, bbox, depth_min,
- depth_max,
- platforms, timeTolerance, radiusTolerance)
- proc.start()
- else: # We assume it to be a Nexus tiled dataset
-
- '''
- Single Threaded at the moment...
- '''
- daysinrange = self._get_tile_service().find_days_in_range_asc(bounds.south, bounds.north, bounds.west,
- bounds.east, matchupId,
- self.__parseDatetime(startTime) / 1000,
- self.__parseDatetime(endTime) / 1000)
-
- tilesByDay = {}
- for dayTimestamp in daysinrange:
- ds1_nexus_tiles = self._get_tile_service().get_tiles_bounded_by_box_at_time(bounds.south, bounds.north,
- bounds.west, bounds.east,
- matchupId, dayTimestamp)
-
- # print "***", type(ds1_nexus_tiles)
- # print ds1_nexus_tiles[0].__dict__
- tilesByDay[dayTimestamp] = ds1_nexus_tiles
-
- primaryContext.processGridded(tilesByDay, matchupId, radiusTolerance, timeTolerance)
-
- matches, numMatches = primaryContext.getFinal(len(matchupIds))
-
- end = self._now()
-
- args = {
- "primary": primary,
- "matchup": matchupIds,
- "startTime": startTime,
- "endTime": endTime,
- "bbox": bbox,
- "timeTolerance": timeTolerance,
- "depthMin": depth_min,
- "depthMax": depth_max,
- "radiusTolerance": radiusTolerance,
- "platforms": platforms
- }
-
- details = {
- "timeToComplete": (end - start),
- "numInSituRecords": primaryContext.insituCount,
- "numInSituMatched": primaryContext.insituMatches,
- "numGriddedChecked": primaryContext.griddedCount,
- "numGriddedMatched": primaryContext.griddedMatched
- }
-
- with ResultsStorage.ResultsStorage() as resultsStorage:
- execution_id = resultsStorage.insertResults(results=matches, params=args, stats=details, startTime=start,
- completeTime=end, userEmail="")
-
- return BaseDomsHandler.DomsQueryResults(results=matches, args=args, details=details, bounds=None, count=None,
- computeOptions=None, executionId=execution_id)
-
-
-class MatchupContextMap:
- def __init__(self):
- pass
-
- def add(self, context):
- pass
-
- def delete(self, context):
- pass
-
-
-class MatchupContext:
- def __init__(self, primaryData):
- self.id = str(uuid.uuid4())
-
- self.griddedCount = 0
- self.griddedMatched = 0
-
- self.insituCount = len(primaryData)
- self.insituMatches = 0
-
- self.primary = primaryData
- for r in self.primary:
- r["matches"] = []
-
- self.data = []
- for s in primaryData:
- u = utm.from_latlon(s["y"], s["x"])
- v = (u[0], u[1], 0.0)
- self.data.append(v)
-
- if len(self.data) > 0:
- self.tree = spatial.KDTree(self.data)
- else:
- self.tree = None
-
- def getFinal(self, minMatchesToInclude):
-
- matched = []
- ttlMatches = 0
- for m in self.primary:
- if len(m["matches"]) >= minMatchesToInclude:
- matched.append(m)
- ttlMatches += len(m["matches"])
-
- return matched, ttlMatches
-
- def processGridded(self, tilesByDay, source, xyTolerance, timeTolerance):
- for r in self.primary:
- foundSatNodes = self.__getSatNodeForLatLonAndTime(tilesByDay, source, r["y"], r["x"], r["time"],
- xyTolerance)
- self.griddedCount += 1
- self.griddedMatched += len(foundSatNodes)
- r["matches"].extend(foundSatNodes)
-
- def processInSitu(self, records, xyTolerance, timeTolerance):
- if self.tree is not None:
- for s in records:
- self.insituCount += 1
- u = utm.from_latlon(s["y"], s["x"])
- coords = np.array([u[0], u[1], 0])
- ball = self.tree.query_ball_point(coords, xyTolerance)
-
- self.insituMatches += len(ball)
-
- for i in ball:
- match = self.primary[i]
- if abs(match["time"] - s["time"]) <= (timeTolerance * 1000.0):
- match["matches"].append(s)
-
- def __getValueForLatLon(self, chunks, lat, lon, arrayName="data"):
- value = get_approximate_value_for_lat_lon(chunks, lat, lon, arrayName)
- return value
-
- def __checkNumber(self, value):
- if isinstance(value, float) and (math.isnan(value) or value == np.nan):
- value = None
- elif value is not None:
- value = float(value)
- return value
-
- def __buildSwathIndexes(self, chunk):
- latlons = []
- utms = []
- indexes = []
- for i in range(0, len(chunk.latitudes)):
- _lat = chunk.latitudes[i]
- if isinstance(_lat, np.ma.core.MaskedConstant):
- continue
- for j in range(0, len(chunk.longitudes)):
- _lon = chunk.longitudes[j]
- if isinstance(_lon, np.ma.core.MaskedConstant):
- continue
-
- value = self.__getChunkValueAtIndex(chunk, (i, j))
- if isinstance(value, float) and (math.isnan(value) or value == np.nan):
- continue
-
- u = utm.from_latlon(_lat, _lon)
- v = (u[0], u[1], 0.0)
- latlons.append((_lat, _lon))
- utms.append(v)
- indexes.append((i, j))
-
- tree = None
- if len(latlons) > 0:
- tree = spatial.KDTree(utms)
-
- chunk.swathIndexing = {
- "tree": tree,
- "latlons": latlons,
- "indexes": indexes
- }
-
- def __getChunkIndexesForLatLon(self, chunk, lat, lon, xyTolerance):
- foundIndexes = []
- foundLatLons = []
-
- if "swathIndexing" not in chunk.__dict__:
- self.__buildSwathIndexes(chunk)
-
- tree = chunk.swathIndexing["tree"]
- if tree is not None:
- indexes = chunk.swathIndexing["indexes"]
- latlons = chunk.swathIndexing["latlons"]
- u = utm.from_latlon(lat, lon)
- coords = np.array([u[0], u[1], 0])
- ball = tree.query_ball_point(coords, xyTolerance)
- for i in ball:
- foundIndexes.append(indexes[i])
- foundLatLons.append(latlons[i])
- return foundIndexes, foundLatLons
-
- def __getChunkValueAtIndex(self, chunk, index, arrayName=None):
-
- if arrayName is None or arrayName == "data":
- data_val = chunk.data[0][index[0]][index[1]]
- else:
- data_val = chunk.meta_data[arrayName][0][index[0]][index[1]]
- return data_val.item() if (data_val is not np.ma.masked) and data_val.size == 1 else float('Nan')
-
- def __getSatNodeForLatLonAndTime(self, chunksByDay, source, lat, lon, searchTime, xyTolerance):
- timeDiff = 86400 * 365 * 1000
- foundNodes = []
-
- for ts in chunksByDay:
- chunks = chunksByDay[ts]
- if abs((ts * 1000) - searchTime) < timeDiff:
- for chunk in chunks:
- indexes, latlons = self.__getChunkIndexesForLatLon(chunk, lat, lon, xyTolerance)
-
- # for index in indexes:
- for i in range(0, len(indexes)):
- index = indexes[i]
- latlon = latlons[i]
- sst = None
- sss = None
- windSpeed = None
- windDirection = None
- windU = None
- windV = None
-
- value = self.__getChunkValueAtIndex(chunk, index)
-
- if isinstance(value, float) and (math.isnan(value) or value == np.nan):
- continue
-
- if "GHRSST" in source:
- sst = value
- elif "ASCATB" in source:
- windU = value
- elif "SSS" in source: # SMAP
- sss = value
-
- if len(chunks) > 0 and "wind_dir" in chunks[0].meta_data:
- windDirection = self.__checkNumber(self.__getChunkValueAtIndex(chunk, index, "wind_dir"))
- if len(chunks) > 0 and "wind_v" in chunks[0].meta_data:
- windV = self.__checkNumber(self.__getChunkValueAtIndex(chunk, index, "wind_v"))
- if len(chunks) > 0 and "wind_speed" in chunks[0].meta_data:
- windSpeed = self.__checkNumber(self.__getChunkValueAtIndex(chunk, index, "wind_speed"))
-
- foundNode = {
- "sea_water_temperature": sst,
- "sea_water_salinity": sss,
- "wind_speed": windSpeed,
- "wind_direction": windDirection,
- "wind_u": windU,
- "wind_v": windV,
- "time": ts,
- "x": self.__checkNumber(latlon[1]),
- "y": self.__checkNumber(latlon[0]),
- "depth": 0,
- "sea_water_temperature_depth": 0,
- "source": source,
- "id": "%s:%s:%s" % (ts, lat, lon)
- }
-
- foundNodes.append(foundNode)
- timeDiff = abs(ts - searchTime)
-
- return foundNodes
-
- def __getSatNodeForLatLonAndTime__(self, chunksByDay, source, lat, lon, searchTime):
-
- timeDiff = 86400 * 365 * 1000
- foundNodes = []
-
- for ts in chunksByDay:
- chunks = chunksByDay[ts]
- # print chunks
- # ts = calendar.timegm(chunks.start.utctimetuple()) * 1000
- if abs((ts * 1000) - searchTime) < timeDiff:
- value = self.__getValueForLatLon(chunks, lat, lon, arrayName="data")
- value = self.__checkNumber(value)
-
- # _Really_ don't like doing it this way...
-
- sst = None
- sss = None
- windSpeed = None
- windDirection = None
- windU = None
- windV = None
-
- if "GHRSST" in source:
- sst = value
-
- if "ASCATB" in source:
- windU = value
-
- if len(chunks) > 0 and "wind_dir" in chunks[0].meta_data:
- windDirection = self.__checkNumber(self.__getValueForLatLon(chunks, lat, lon, arrayName="wind_dir"))
- if len(chunks) > 0 and "wind_v" in chunks[0].meta_data:
- windV = self.__checkNumber(self.__getValueForLatLon(chunks, lat, lon, arrayName="wind_v"))
- if len(chunks) > 0 and "wind_speed" in chunks[0].meta_data:
- windSpeed = self.__checkNumber(self.__getValueForLatLon(chunks, lat, lon, arrayName="wind_speed"))
-
- foundNode = {
- "sea_water_temperature": sst,
- "sea_water_salinity": sss,
- "wind_speed": windSpeed,
- "wind_direction": windDirection,
- "wind_uv": {
- "u": windU,
- "v": windV
- },
- "time": ts,
- "x": lon,
- "y": lat,
- "depth": 0,
- "sea_water_temperature_depth": 0,
- "source": source,
- "id": "%s:%s:%s" % (ts, lat, lon)
- }
-
- isValidNode = True
- if "ASCATB" in source and windSpeed is None:
- isValidNode = None
-
- if isValidNode:
- foundNodes.append(foundNode)
- timeDiff = abs(ts - searchTime)
-
- return foundNodes
-
-
-class InsituDatasetProcessor:
- def __init__(self, primary, datasource, startTime, endTime, bbox, depth_min, depth_max, platforms, timeTolerance,
- radiusTolerance):
- self.primary = primary
- self.datasource = datasource
- self.startTime = startTime
- self.endTime = endTime
- self.bbox = bbox
- self.depth_min = depth_min
- self.depth_max = depth_max
- self.platforms = platforms
- self.timeTolerance = timeTolerance
- self.radiusTolerance = radiusTolerance
-
- def start(self):
- def callback(pageData):
- self.primary.processInSitu(pageData, self.radiusTolerance, self.timeTolerance)
-
- fetchedgeimpl.fetch(self.datasource, self.startTime, self.endTime, self.bbox, self.depth_min, self.depth_max,
- self.platforms, pageCallback=callback)
-
-
-class InsituPageProcessor:
- def __init__(self):
- pass
diff --git a/analysis/webservice/algorithms/doms/MetadataQuery.py b/analysis/webservice/algorithms/doms/MetadataQuery.py
deleted file mode 100644
index aa24d91..0000000
--- a/analysis/webservice/algorithms/doms/MetadataQuery.py
+++ /dev/null
@@ -1,65 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import json
-
-import requests
-
-import BaseDomsHandler
-import config
-from webservice.algorithms.NexusCalcHandler import NexusCalcHandler as BaseHandler
-from webservice.NexusHandler import nexus_handler
-from webservice.webmodel import DatasetNotFoundException
-
-
-@nexus_handler
-class DomsMetadataQueryHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
- name = "DOMS Metadata Listing"
- path = "/domsmetadata"
- description = ""
- params = {}
- singleton = True
-
- def __init__(self):
- BaseHandler.__init__(self)
-
- def calc(self, computeOptions, **args):
-
- dataset = computeOptions.get_argument("dataset", None)
- if dataset is None or len(dataset) == 0:
- raise Exception("'dataset' parameter not specified")
-
- metadataUrl = self.__getUrlForDataset(dataset)
-
- try:
- r = requests.get(metadataUrl)
- results = json.loads(r.text)
- return BaseDomsHandler.DomsQueryResults(results=results)
- except:
- raise DatasetNotFoundException("Dataset '%s' not found")
-
- def __getUrlForDataset(self, dataset):
- datasetSpec = config.getEndpointByName(dataset)
- if datasetSpec is not None:
- return datasetSpec["metadataUrl"]
- else:
-
- # KMG: NOT a good hack
- if dataset == "JPL-L4_GHRSST-SSTfnd-MUR-GLOB-v02.0-fv04.1" or dataset == "JPL-L4_GHRSST-SSTfnd-MUR-GLOB-v02.0-fv04.1_CLIM":
- dataset = "MUR-JPL-L4-GLOB-v4.1"
- elif dataset == "SMAP_L2B_SSS":
- dataset = "JPL_SMAP-SSS_L2_EVAL-V2"
-
- return "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=%s&format=umm-json" % dataset
diff --git a/analysis/webservice/algorithms/doms/ResultsPlotQuery.py b/analysis/webservice/algorithms/doms/ResultsPlotQuery.py
deleted file mode 100644
index 1b48d14..0000000
--- a/analysis/webservice/algorithms/doms/ResultsPlotQuery.py
+++ /dev/null
@@ -1,55 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import BaseDomsHandler
-import histogramplot
-import mapplot
-import scatterplot
-from webservice.NexusHandler import nexus_handler
-
-
-class PlotTypes:
- SCATTER = "scatter"
- MAP = "map"
- HISTOGRAM = "histogram"
-
-
-@nexus_handler
-class DomsResultsPlotHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
- name = "DOMS Results Plotting"
- path = "/domsplot"
- description = ""
- params = {}
- singleton = True
-
- def __init__(self):
- BaseDomsHandler.BaseDomsQueryCalcHandler.__init__(self)
-
- def calc(self, computeOptions, **args):
- id = computeOptions.get_argument("id", None)
- parameter = computeOptions.get_argument('parameter', 'sst')
-
- plotType = computeOptions.get_argument("type", PlotTypes.SCATTER)
-
- normAndCurve = computeOptions.get_boolean_arg("normandcurve", False)
-
- if plotType == PlotTypes.SCATTER:
- return scatterplot.createScatterPlot(id, parameter)
- elif plotType == PlotTypes.MAP:
- return mapplot.createMapPlot(id, parameter)
- elif plotType == PlotTypes.HISTOGRAM:
- return histogramplot.createHistogramPlot(id, parameter, normAndCurve)
- else:
- raise Exception("Unsupported plot type '%s' specified." % plotType)
diff --git a/analysis/webservice/algorithms/doms/ResultsRetrieval.py b/analysis/webservice/algorithms/doms/ResultsRetrieval.py
deleted file mode 100644
index 93358e9..0000000
--- a/analysis/webservice/algorithms/doms/ResultsRetrieval.py
+++ /dev/null
@@ -1,49 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import uuid
-
-import BaseDomsHandler
-import ResultsStorage
-from webservice.NexusHandler import nexus_handler
-from webservice.webmodel import NexusProcessingException
-
-
-@nexus_handler
-class DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
- name = "DOMS Resultset Retrieval"
- path = "/domsresults"
- description = ""
- params = {}
- singleton = True
-
- def __init__(self):
- BaseDomsHandler.BaseDomsQueryCalcHandler.__init__(self)
-
- def calc(self, computeOptions, **args):
- execution_id = computeOptions.get_argument("id", None)
-
- try:
- execution_id = uuid.UUID(execution_id)
- except:
- raise NexusProcessingException(reason="'id' argument must be a valid uuid", code=400)
-
- simple_results = computeOptions.get_boolean_arg("simpleResults", default=False)
-
- with ResultsStorage.ResultsRetrieval() as storage:
- params, stats, data = storage.retrieveResults(execution_id, trim_data=simple_results)
-
- return BaseDomsHandler.DomsQueryResults(results=data, args=params, details=stats, bounds=None, count=None,
- computeOptions=None, executionId=execution_id)
diff --git a/analysis/webservice/algorithms/doms/ResultsStorage.py b/analysis/webservice/algorithms/doms/ResultsStorage.py
deleted file mode 100644
index 03bbd09..0000000
--- a/analysis/webservice/algorithms/doms/ResultsStorage.py
+++ /dev/null
@@ -1,286 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-
-import ConfigParser
-import logging
-import uuid
-from datetime import datetime
-
-import pkg_resources
-from cassandra.cluster import Cluster
-from cassandra.policies import TokenAwarePolicy, DCAwareRoundRobinPolicy
-from cassandra.query import BatchStatement
-from pytz import UTC
-
-
-class AbstractResultsContainer:
- def __init__(self):
- self._log = logging.getLogger(__name__)
- self._log.info("Creating DOMS Results Storage Instance")
-
- self._session = None
-
- def __enter__(self):
- domsconfig = ConfigParser.RawConfigParser()
- domsconfig.readfp(pkg_resources.resource_stream(__name__, "domsconfig.ini"), filename='domsconfig.ini')
-
- cassHost = domsconfig.get("cassandra", "host")
- cassKeyspace = domsconfig.get("cassandra", "keyspace")
- cassDatacenter = domsconfig.get("cassandra", "local_datacenter")
- cassVersion = int(domsconfig.get("cassandra", "protocol_version"))
-
- dc_policy = DCAwareRoundRobinPolicy(cassDatacenter)
- token_policy = TokenAwarePolicy(dc_policy)
-
- self._cluster = Cluster([host for host in cassHost.split(',')], load_balancing_policy=token_policy,
- protocol_version=cassVersion)
-
- self._session = self._cluster.connect(cassKeyspace)
- return self
-
- def __exit__(self, exc_type, exc_val, exc_tb):
- self._cluster.shutdown()
-
- def _parseDatetime(self, dtString):
- dt = datetime.strptime(dtString, "%Y-%m-%dT%H:%M:%SZ")
- epoch = datetime.utcfromtimestamp(0)
- time = (dt - epoch).total_seconds() * 1000.0
- return int(time)
-
-
-class ResultsStorage(AbstractResultsContainer):
- def __init__(self):
- AbstractResultsContainer.__init__(self)
-
- def insertResults(self, results, params, stats, startTime, completeTime, userEmail, execution_id=None):
- if isinstance(execution_id, basestring):
- execution_id = uuid.UUID(execution_id)
-
- execution_id = self.insertExecution(execution_id, startTime, completeTime, userEmail)
- self.__insertParams(execution_id, params)
- self.__insertStats(execution_id, stats)
- self.__insertResults(execution_id, results)
- return execution_id
-
- def insertExecution(self, execution_id, startTime, completeTime, userEmail):
- if execution_id is None:
- execution_id = uuid.uuid4()
-
- cql = "INSERT INTO doms_executions (id, time_started, time_completed, user_email) VALUES (%s, %s, %s, %s)"
- self._session.execute(cql, (execution_id, startTime, completeTime, userEmail))
- return execution_id
-
- def __insertParams(self, execution_id, params):
- cql = """INSERT INTO doms_params
- (execution_id, primary_dataset, matchup_datasets, depth_min, depth_max, time_tolerance, radius_tolerance, start_time, end_time, platforms, bounding_box, parameter)
- VALUES
- (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
- """
- self._session.execute(cql, (execution_id,
- params["primary"],
- ",".join(params["matchup"]) if type(params["matchup"]) == list else params[
- "matchup"],
- params["depthMin"] if "depthMin" in params.keys() else None,
- params["depthMax"] if "depthMax" in params.keys() else None,
- int(params["timeTolerance"]),
- params["radiusTolerance"],
- params["startTime"],
- params["endTime"],
- params["platforms"],
- params["bbox"],
- params["parameter"]
- ))
-
- def __insertStats(self, execution_id, stats):
- cql = """
- INSERT INTO doms_execution_stats
- (execution_id, num_gridded_matched, num_gridded_checked, num_insitu_matched, num_insitu_checked, time_to_complete)
- VALUES
- (%s, %s, %s, %s, %s, %s)
- """
- self._session.execute(cql, (
- execution_id,
- stats["numGriddedMatched"],
- stats["numGriddedChecked"],
- stats["numInSituMatched"],
- stats["numInSituRecords"],
- stats["timeToComplete"]
- ))
-
- def __insertResults(self, execution_id, results):
-
- cql = """
- INSERT INTO doms_data
- (id, execution_id, value_id, primary_value_id, x, y, source_dataset, measurement_time, platform, device, measurement_values, is_primary)
- VALUES
- (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
- """
- insertStatement = self._session.prepare(cql)
- batch = BatchStatement()
-
- for result in results:
- self.__insertResult(execution_id, None, result, batch, insertStatement)
-
- self._session.execute(batch)
-
- def __insertResult(self, execution_id, primaryId, result, batch, insertStatement):
-
- dataMap = self.__buildDataMap(result)
- result_id = uuid.uuid4()
- batch.add(insertStatement, (
- result_id,
- execution_id,
- result["id"],
- primaryId,
- result["x"],
- result["y"],
- result["source"],
- result["time"],
- result["platform"] if "platform" in result else None,
- result["device"] if "device" in result else None,
- dataMap,
- 1 if primaryId is None else 0
- )
- )
-
- n = 0
- if "matches" in result:
- for match in result["matches"]:
- self.__insertResult(execution_id, result["id"], match, batch, insertStatement)
- n += 1
- if n >= 20:
- if primaryId is None:
- self.__commitBatch(batch)
- n = 0
-
- if primaryId is None:
- self.__commitBatch(batch)
-
- def __commitBatch(self, batch):
- self._session.execute(batch)
- batch.clear()
-
- def __buildDataMap(self, result):
- dataMap = {}
- for name in result:
- value = result[name]
- if name not in ["id", "x", "y", "source", "time", "platform", "device", "point", "matches"] and type(
- value) in [float, int]:
- dataMap[name] = value
- return dataMap
-
-
-class ResultsRetrieval(AbstractResultsContainer):
- def __init__(self):
- AbstractResultsContainer.__init__(self)
-
- def retrieveResults(self, execution_id, trim_data=False):
- if isinstance(execution_id, basestring):
- execution_id = uuid.UUID(execution_id)
-
- params = self.__retrieveParams(execution_id)
- stats = self.__retrieveStats(execution_id)
- data = self.__retrieveData(execution_id, trim_data=trim_data)
- return params, stats, data
-
- def __retrieveData(self, id, trim_data=False):
- dataMap = self.__retrievePrimaryData(id, trim_data=trim_data)
- self.__enrichPrimaryDataWithMatches(id, dataMap, trim_data=trim_data)
- data = [dataMap[name] for name in dataMap]
- return data
-
- def __enrichPrimaryDataWithMatches(self, id, dataMap, trim_data=False):
- cql = "SELECT * FROM doms_data where execution_id = %s and is_primary = false"
- rows = self._session.execute(cql, (id,))
-
- for row in rows:
- entry = self.__rowToDataEntry(row, trim_data=trim_data)
- if row.primary_value_id in dataMap:
- if not "matches" in dataMap[row.primary_value_id]:
- dataMap[row.primary_value_id]["matches"] = []
- dataMap[row.primary_value_id]["matches"].append(entry)
- else:
- print row
-
- def __retrievePrimaryData(self, id, trim_data=False):
- cql = "SELECT * FROM doms_data where execution_id = %s and is_primary = true"
- rows = self._session.execute(cql, (id,))
-
- dataMap = {}
- for row in rows:
- entry = self.__rowToDataEntry(row, trim_data=trim_data)
- dataMap[row.value_id] = entry
- return dataMap
-
- def __rowToDataEntry(self, row, trim_data=False):
- if trim_data:
- entry = {
- "x": float(row.x),
- "y": float(row.y),
- "source": row.source_dataset,
- "time": row.measurement_time.replace(tzinfo=UTC)
- }
- else:
- entry = {
- "id": row.value_id,
- "x": float(row.x),
- "y": float(row.y),
- "source": row.source_dataset,
- "device": row.device,
- "platform": row.platform,
- "time": row.measurement_time.replace(tzinfo=UTC)
- }
- for key in row.measurement_values:
- value = float(row.measurement_values[key])
- entry[key] = value
- return entry
-
- def __retrieveStats(self, id):
- cql = "SELECT * FROM doms_execution_stats where execution_id = %s limit 1"
- rows = self._session.execute(cql, (id,))
- for row in rows:
- stats = {
- "numGriddedMatched": row.num_gridded_matched,
- "numGriddedChecked": row.num_gridded_checked,
- "numInSituMatched": row.num_insitu_matched,
- "numInSituChecked": row.num_insitu_checked,
- "timeToComplete": row.time_to_complete
- }
- return stats
-
- raise Exception("Execution not found with id '%s'" % id)
-
- def __retrieveParams(self, id):
- cql = "SELECT * FROM doms_params where execution_id = %s limit 1"
- rows = self._session.execute(cql, (id,))
- for row in rows:
- params = {
- "primary": row.primary_dataset,
- "matchup": row.matchup_datasets.split(","),
- "depthMin": row.depth_min,
- "depthMax": row.depth_max,
- "timeTolerance": row.time_tolerance,
- "radiusTolerance": row.radius_tolerance,
- "startTime": row.start_time.replace(tzinfo=UTC),
- "endTime": row.end_time.replace(tzinfo=UTC),
- "platforms": row.platforms,
- "bbox": row.bounding_box,
- "parameter": row.parameter
- }
- return params
-
- raise Exception("Execution not found with id '%s'" % id)
diff --git a/analysis/webservice/algorithms/doms/StatsQuery.py b/analysis/webservice/algorithms/doms/StatsQuery.py
deleted file mode 100644
index f5ac765..0000000
--- a/analysis/webservice/algorithms/doms/StatsQuery.py
+++ /dev/null
@@ -1,63 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import BaseDomsHandler
-import datafetch
-from webservice.algorithms.NexusCalcHandler import NexusCalcHandler as BaseHandler
-from webservice.NexusHandler import nexus_handler
-
-
-@nexus_handler
-class DomsStatsQueryHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
- name = "DOMS In-Situ Stats Lookup"
- path = "/domsstats"
- description = ""
- params = {}
- singleton = True
-
- def __init__(self):
- BaseHandler.__init__(self)
-
- def calc(self, computeOptions, **args):
- source = computeOptions.get_argument("source", None)
- startTime = computeOptions.get_argument("s", None)
- endTime = computeOptions.get_argument("e", None)
- bbox = computeOptions.get_argument("b", None)
- timeTolerance = computeOptions.get_float_arg("tt")
- depth_min = computeOptions.get_float_arg("depthMin", default=None)
- depth_max = computeOptions.get_float_arg("depthMax", default=None)
- radiusTolerance = computeOptions.get_float_arg("rt")
- platforms = computeOptions.get_argument("platforms", None)
-
- source1 = self.getDataSourceByName(source)
- if source1 is None:
- raise Exception("Source '%s' not found" % source)
-
- count, bounds = datafetch.getCount(source1, startTime, endTime, bbox, depth_min, depth_max, platforms)
-
- args = {
- "source": source,
- "startTime": startTime,
- "endTime": endTime,
- "bbox": bbox,
- "timeTolerance": timeTolerance,
- "depthMin": depth_min,
- "depthMax": depth_max,
- "radiusTolerance": radiusTolerance,
- "platforms": platforms
- }
-
- return BaseDomsHandler.DomsQueryResults(results={}, args=args, details={}, bounds=bounds, count=count,
- computeOptions=None)
diff --git a/analysis/webservice/algorithms/doms/ValuesQuery.py b/analysis/webservice/algorithms/doms/ValuesQuery.py
deleted file mode 100644
index d766c7b..0000000
--- a/analysis/webservice/algorithms/doms/ValuesQuery.py
+++ /dev/null
@@ -1,72 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from datetime import datetime
-
-from pytz import timezone
-
-import BaseDomsHandler
-import datafetch
-from webservice.algorithms.NexusCalcHandler import NexusCalcHandler as BaseHandler
-from webservice.NexusHandler import nexus_handler
-
-EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
-
-
-@nexus_handler
-class DomsValuesQueryHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
- name = "DOMS In-Situ Value Lookup"
- path = "/domsvalues"
- description = ""
- params = {}
- singleton = True
-
- def __init__(self):
- BaseHandler.__init__(self)
-
- def calc(self, computeOptions, **args):
- source = computeOptions.get_argument("source", None)
- startTime = computeOptions.get_start_datetime()
- endTime = computeOptions.get_end_datetime()
- bbox = computeOptions.get_argument("b", None)
- timeTolerance = computeOptions.get_float_arg("tt")
- depth_min = computeOptions.get_float_arg("depthMin", default=None)
- depth_max = computeOptions.get_float_arg("depthMax", default=None)
- radiusTolerance = computeOptions.get_float_arg("rt")
- platforms = computeOptions.get_argument("platforms", "")
-
- source1 = self.getDataSourceByName(source)
- if source1 is None:
- raise Exception("Source '%s' not found" % source)
-
- values, bounds = datafetch.getValues(source1, startTime.strftime('%Y-%m-%dT%H:%M:%SZ'),
- endTime.strftime('%Y-%m-%dT%H:%M:%SZ'), bbox, depth_min, depth_max,
- platforms, placeholders=True)
- count = len(values)
-
- args = {
- "source": source,
- "startTime": startTime,
- "endTime": endTime,
- "bbox": bbox,
- "timeTolerance": timeTolerance,
- "depthMin": depth_min,
- "depthMax": depth_max,
- "radiusTolerance": radiusTolerance,
- "platforms": platforms
- }
-
- return BaseDomsHandler.DomsQueryResults(results=values, args=args, bounds=bounds, details={}, count=count,
- computeOptions=None)
diff --git a/analysis/webservice/algorithms/doms/__init__.py b/analysis/webservice/algorithms/doms/__init__.py
deleted file mode 100644
index d5a8e24..0000000
--- a/analysis/webservice/algorithms/doms/__init__.py
+++ /dev/null
@@ -1,34 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-import BaseDomsHandler
-import DatasetListQuery
-import DomsInitialization
-import MatchupQuery
-import MetadataQuery
-import ResultsPlotQuery
-import ResultsRetrieval
-import ResultsStorage
-import StatsQuery
-import ValuesQuery
-import config
-import datafetch
-import fetchedgeimpl
-import geo
-import insitusubset
-import subsetter
-import values
-import workerthread
diff --git a/analysis/webservice/algorithms/doms/config.py b/analysis/webservice/algorithms/doms/config.py
deleted file mode 100644
index ff492e8..0000000
--- a/analysis/webservice/algorithms/doms/config.py
+++ /dev/null
@@ -1,109 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-ENDPOINTS = [
- {
- "name": "samos",
- "url": "http://doms.coaps.fsu.edu:8890/ws/search/samos",
- "fetchParallel": True,
- "fetchThreads": 8,
- "itemsPerPage": 1000,
- "metadataUrl": "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=SAMOS&format=umm-json"
- },
- {
- "name": "spurs",
- "url": "https://doms.jpl.nasa.gov/ws/search/spurs",
- "fetchParallel": True,
- "fetchThreads": 8,
- "itemsPerPage": 25000,
- "metadataUrl": "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=SPURS-1&format=umm-json"
- },
- {
- "name": "icoads",
- "url": "http://rda-data.ucar.edu:8890/ws/search/icoads",
- "fetchParallel": True,
- "fetchThreads": 8,
- "itemsPerPage": 1000,
- "metadataUrl": "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=ICOADS&format=umm-json"
- },
- {
- "name": "spurs2",
- "url": "https://doms.jpl.nasa.gov/ws/search/spurs2",
- "fetchParallel": True,
- "fetchThreads": 8,
- "itemsPerPage": 25000,
- "metadataUrl": "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=SPURS-2&format=umm-json"
- }
-]
-
-METADATA_LINKS = {
- "samos": "http://samos.coaps.fsu.edu/html/nav.php?s=2",
- "icoads": "https://rda.ucar.edu/datasets/ds548.1/",
- "spurs": "https://podaac.jpl.nasa.gov/spurs"
-}
-
-import os
-
-try:
- env = os.environ['ENV']
- if env == 'dev':
- ENDPOINTS = [
- {
- "name": "samos",
- "url": "http://doms.coaps.fsu.edu:8890/ws/search/samos",
- "fetchParallel": True,
- "fetchThreads": 8,
- "itemsPerPage": 1000,
- "metadataUrl": "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=SAMOS&format=umm-json"
- },
- {
- "name": "spurs",
- "url": "http://127.0.0.1:8890/ws/search/spurs",
- "fetchParallel": True,
- "fetchThreads": 8,
- "itemsPerPage": 25000,
- "metadataUrl": "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=SPURS-1&format=umm-json"
- },
- {
- "name": "icoads",
- "url": "http://rda-data.ucar.edu:8890/ws/search/icoads",
- "fetchParallel": True,
- "fetchThreads": 8,
- "itemsPerPage": 1000,
- "metadataUrl": "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=ICOADS&format=umm-json"
- },
- {
- "name": "spurs2",
- "url": "https://doms.jpl.nasa.gov/ws/search/spurs2",
- "fetchParallel": True,
- "fetchThreads": 8,
- "itemsPerPage": 25000,
- "metadataUrl": "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=SPURS-2&format=umm-json"
- }
- ]
- METADATA_LINKS = {
- "samos": "http://samos.coaps.fsu.edu/html/nav.php?s=2",
- "icoads": "https://rda.ucar.edu/datasets/ds548.1/",
- "spurs": "https://podaac.jpl.nasa.gov/spurs"
- }
-except KeyError:
- pass
-
-
-def getEndpointByName(name):
- for endpoint in ENDPOINTS:
- if endpoint["name"].upper() == name.upper():
- return endpoint
- return None
diff --git a/analysis/webservice/algorithms/doms/datafetch.py b/analysis/webservice/algorithms/doms/datafetch.py
deleted file mode 100644
index 3fc3917..0000000
--- a/analysis/webservice/algorithms/doms/datafetch.py
+++ /dev/null
@@ -1,47 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import fetchedgeimpl
-
-
-def getCount(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms=None):
- return fetchedgeimpl.getCount(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms)
-
-
-def __fetchSingleDataSource(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms=None):
- return fetchedgeimpl.fetch(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms)
-
-
-def __fetchMultipleDataSource(endpoints, startTime, endTime, bbox, depth_min, depth_max, platforms=None):
- data = []
- for endpoint in endpoints:
- dataSingleSource = __fetchSingleDataSource(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms)
- data = data + dataSingleSource
- return data
-
-
-def fetchData(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms=None):
- if type(endpoint) == list:
- return __fetchMultipleDataSource(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms)
- else:
- return __fetchSingleDataSource(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms)
-
-
-def getValues(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms=None, placeholders=False):
- return fetchedgeimpl.getValues(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms, placeholders)
-
-
-if __name__ == "__main__":
- pass
diff --git a/analysis/webservice/algorithms/doms/domsconfig.ini.default b/analysis/webservice/algorithms/doms/domsconfig.ini.default
deleted file mode 100644
index d1814bf..0000000
--- a/analysis/webservice/algorithms/doms/domsconfig.ini.default
+++ /dev/null
@@ -1,15 +0,0 @@
-[cassandra]
-host=sdap-cassandra
-port=9042
-keyspace=doms
-local_datacenter=datacenter1
-protocol_version=3
-dc_policy=DCAwareRoundRobinPolicy
-
-
-[cassandraDD]
-host=128.149.115.178,128.149.115.173,128.149.115.176,128.149.115.175,128.149.115.172,128.149.115.174,128.149.115.177
-keyspace=doms
-local_datacenter=B600
-protocol_version=3
-
diff --git a/analysis/webservice/algorithms/doms/fetchedgeimpl.py b/analysis/webservice/algorithms/doms/fetchedgeimpl.py
deleted file mode 100644
index 70cf14e..0000000
--- a/analysis/webservice/algorithms/doms/fetchedgeimpl.py
+++ /dev/null
@@ -1,217 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import json
-import traceback
-from datetime import datetime
-from multiprocessing.pool import ThreadPool
-
-import requests
-
-import geo
-import values
-from webservice.webmodel import NexusProcessingException
-
-
-def __parseDatetime(dtString):
- dt = datetime.strptime(dtString, "%Y-%m-%dT%H:%M:%SZ")
- epoch = datetime.utcfromtimestamp(0)
- time = (dt - epoch).total_seconds() * 1000.0
- return time
-
-
-def __parseLocation(locString):
- if "Point" in locString:
- locString = locString[6:-1]
-
- if "," in locString:
- latitude = float(locString.split(",")[0])
- longitude = float(locString.split(",")[1])
- else:
- latitude = float(locString.split(" ")[1])
- longitude = float(locString.split(" ")[0])
-
- return (latitude, longitude)
-
-
-def __resultRawToUsable(resultdict):
- resultdict["time"] = __parseDatetime(resultdict["time"])
- latitude, longitude = __parseLocation(resultdict["point"])
-
- resultdict["x"] = longitude
- resultdict["y"] = latitude
-
- if "id" not in resultdict and "metadata" in resultdict:
- resultdict["id"] = resultdict["metadata"]
-
- resultdict["id"] = "id-%s" % resultdict["id"]
-
- if "device" in resultdict:
- resultdict["device"] = values.getDeviceById(resultdict["device"])
-
- if "platform" in resultdict:
- resultdict["platform"] = values.getPlatformById(resultdict["platform"])
-
- if "mission" in resultdict:
- resultdict["mission"] = values.getMissionById(resultdict["mission"])
-
- if "sea_surface_temperature" in resultdict:
- resultdict["sea_water_temperature"] = resultdict["sea_surface_temperature"]
- del resultdict["sea_surface_temperature"]
-
- return resultdict
-
-
-def __fetchJson(url, params, trycount=1, maxtries=5):
- if trycount > maxtries:
- raise Exception("Maximum retries attempted.")
- if trycount > 1:
- print "Retry #", trycount
- r = requests.get(url, params=params, timeout=500.000)
-
- print r.url
-
- if r.status_code != 200:
- return __fetchJson(url, params, trycount + 1, maxtries)
- try:
- results = json.loads(r.text)
- return results
- except:
- return __fetchJson(url, params, trycount + 1, maxtries)
-
-
-def __doQuery(endpoint, startTime, endTime, bbox, depth_min=None, depth_max=None, itemsPerPage=10, startIndex=0,
- platforms=None,
- pageCallback=None):
- params = {"startTime": startTime, "endTime": endTime, "bbox": bbox, "itemsPerPage": itemsPerPage,
- "startIndex": startIndex, "stats": "true"}
-
- if depth_min is not None:
- params['minDepth'] = depth_min
- if depth_max is not None:
- params['maxDepth'] = depth_max
-
- if platforms is not None:
- params["platform"] = platforms.split(",")
-
- resultsRaw = __fetchJson(endpoint["url"], params)
- boundsConstrainer = geo.BoundsConstrainer(north=-90, south=90, west=180, east=-180)
-
- if resultsRaw["totalResults"] == 0 or len(resultsRaw["results"]) == 0: # Double-sanity check
- return [], resultsRaw["totalResults"], startIndex, itemsPerPage, boundsConstrainer
-
- try:
- results = []
- for resultdict in resultsRaw["results"]:
- result = __resultRawToUsable(resultdict)
- result["source"] = endpoint["name"]
- boundsConstrainer.testCoords(north=result["y"], south=result["y"], west=result["x"], east=result["x"])
- results.append(result)
-
- if "stats_fields" in resultsRaw and len(resultsRaw["results"]) == 0:
- stats = resultsRaw["stats_fields"]
- if "lat" in stats and "lon" in stats:
- boundsConstrainer.testCoords(north=stats['lat']['max'], south=stats['lat']['min'],
- west=stats['lon']['min'], east=stats['lon']['max'])
-
- if pageCallback is not None:
- pageCallback(results)
-
- '''
- If pageCallback was supplied, we assume this call to be asynchronous. Otherwise combine all the results data and return it.
- '''
- if pageCallback is None:
- return results, int(resultsRaw["totalResults"]), int(resultsRaw["startIndex"]), int(
- resultsRaw["itemsPerPage"]), boundsConstrainer
- else:
- return [], int(resultsRaw["totalResults"]), int(resultsRaw["startIndex"]), int(
- resultsRaw["itemsPerPage"]), boundsConstrainer
- except:
- print "Invalid or missing JSON in response."
- traceback.print_exc()
- raise NexusProcessingException(reason="Invalid or missing JSON in response.")
- # return [], 0, startIndex, itemsPerPage, boundsConstrainer
-
-
-def getCount(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms=None):
- startIndex = 0
- pageResults, totalResults, pageStartIndex, itemsPerPageR, boundsConstrainer = __doQuery(endpoint, startTime,
- endTime, bbox,
- depth_min, depth_max, 0,
- startIndex, platforms)
- return totalResults, boundsConstrainer
-
-
-def fetch(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms=None, pageCallback=None):
- results = []
- startIndex = 0
-
- mainBoundsConstrainer = geo.BoundsConstrainer(north=-90, south=90, west=180, east=-180)
-
- # First isn't parellel so we can get the ttl results, forced items per page, etc...
- pageResults, totalResults, pageStartIndex, itemsPerPageR, boundsConstrainer = __doQuery(endpoint, startTime,
- endTime, bbox,
- depth_min, depth_max,
- endpoint["itemsPerPage"],
- startIndex, platforms,
- pageCallback)
- results = results + pageResults
- mainBoundsConstrainer.testOtherConstrainer(boundsConstrainer)
-
- pool = ThreadPool(processes=endpoint["fetchThreads"])
- mpResults = [pool.apply_async(__doQuery, args=(
- endpoint, startTime, endTime, bbox, depth_min, depth_max, itemsPerPageR, x, platforms, pageCallback)) for x in
- range(len(pageResults), totalResults, itemsPerPageR)]
- pool.close()
- pool.join()
-
- '''
- If pageCallback was supplied, we assume this call to be asynchronous. Otherwise combine all the results data and return it.
- '''
- if pageCallback is None:
- mpResults = [p.get() for p in mpResults]
- for mpResult in mpResults:
- results = results + mpResult[0]
- mainBoundsConstrainer.testOtherConstrainer(mpResult[4])
-
- return results, mainBoundsConstrainer
-
-
-def getValues(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms=None, placeholders=False):
- results, boundsConstrainer = fetch(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms)
-
- if placeholders:
- trimmedResults = []
- for item in results:
- depth = None
- if "depth" in item:
- depth = item["depth"]
- if "sea_water_temperature_depth" in item:
- depth = item["sea_water_temperature_depth"]
-
- trimmedItem = {
- "x": item["x"],
- "y": item["y"],
- "source": item["source"],
- "time": item["time"],
- "device": item["device"] if "device" in item else None,
- "platform": item["platform"],
- "depth": depth
- }
- trimmedResults.append(trimmedItem)
-
- results = trimmedResults
-
- return results, boundsConstrainer
diff --git a/analysis/webservice/algorithms/doms/geo.py b/analysis/webservice/algorithms/doms/geo.py
deleted file mode 100644
index 3323f57..0000000
--- a/analysis/webservice/algorithms/doms/geo.py
+++ /dev/null
@@ -1,129 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import math
-
-MEAN_RADIUS_EARTH_METERS = 6371010.0
-EQUATORIAL_RADIUS_EARTH_METERS = 6378140.0
-POLAR_RADIUS_EARTH_METERS = 6356752.0
-FLATTENING_EARTH = 298.257223563
-MEAN_RADIUS_EARTH_MILES = 3958.8
-
-
-class DistanceUnit(object):
- METERS = 0
- MILES = 1
-
-
-# Haversine implementation for great-circle distances between two points
-def haversine(x0, y0, x1, y1, units=DistanceUnit.METERS):
- if units == DistanceUnit.METERS:
- R = MEAN_RADIUS_EARTH_METERS
- elif units == DistanceUnit.MILES:
- R = MEAN_RADIUS_EARTH_MILES
- else:
- raise Exception("Invalid units specified")
- x0r = x0 * (math.pi / 180.0) # To radians
- x1r = x1 * (math.pi / 180.0) # To radians
- xd = (x1 - x0) * (math.pi / 180.0)
- yd = (y1 - y0) * (math.pi / 180.0)
-
- a = math.sin(xd / 2.0) * math.sin(xd / 2.0) + \
- math.cos(x0r) * math.cos(x1r) * \
- math.sin(yd / 2.0) * math.sin(yd / 2.0)
- c = 2.0 * math.atan2(math.sqrt(a), math.sqrt(1.0 - a))
- d = R * c
- return d
-
-
-# Equirectangular approximation for when performance is key. Better at smaller distances
-def equirectangularApprox(x0, y0, x1, y1):
- R = 6371000.0 # Meters
- x0r = x0 * (math.pi / 180.0) # To radians
- x1r = x1 * (math.pi / 180.0)
- y0r = y0 * (math.pi / 180.0)
- y1r = y1 * (math.pi / 180.0)
-
- x = (y1r - y0r) * math.cos((x0r + x1r) / 2.0)
- y = x1r - x0r
- d = math.sqrt(x * x + y * y) * R
- return d
-
-
-class BoundingBox(object):
-
- def __init__(self, north=None, south=None, west=None, east=None, asString=None):
- if asString is not None:
- bboxParts = asString.split(",")
- self.west = float(bboxParts[0])
- self.south = float(bboxParts[1])
- self.east = float(bboxParts[2])
- self.north = float(bboxParts[3])
- else:
- self.north = north
- self.south = south
- self.west = west
- self.east = east
-
- def toString(self):
- return "%s,%s,%s,%s" % (self.west, self.south, self.east, self.north)
-
- def toMap(self):
- return {
- "xmin": self.west,
- "xmax": self.east,
- "ymin": self.south,
- "ymax": self.north
- }
-
-
-'''
- Constrains, does not expand.
-'''
-
-
-class BoundsConstrainer(BoundingBox):
-
- def __init__(self, north=None, south=None, west=None, east=None, asString=None):
- BoundingBox.__init__(self, north, south, west, east, asString)
-
- def testNorth(self, v):
- if v is None:
- return
- self.north = max([self.north, v])
-
- def testSouth(self, v):
- if v is None:
- return
- self.south = min([self.south, v])
-
- def testEast(self, v):
- if v is None:
- return
- self.east = max([self.east, v])
-
- def testWest(self, v):
- if v is None:
- return
- self.west = min([self.west, v])
-
- def testCoords(self, north=None, south=None, west=None, east=None):
- self.testNorth(north)
- self.testSouth(south)
- self.testWest(west)
- self.testEast(east)
-
- def testOtherConstrainer(self, other):
- self.testCoords(north=other.north, south=other.south, west=other.west, east=other.east)
diff --git a/analysis/webservice/algorithms/doms/histogramplot.py b/analysis/webservice/algorithms/doms/histogramplot.py
deleted file mode 100644
index 1e06b66..0000000
--- a/analysis/webservice/algorithms/doms/histogramplot.py
+++ /dev/null
@@ -1,127 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import string
-from cStringIO import StringIO
-from multiprocessing import Process, Manager
-
-import matplotlib
-import matplotlib.mlab as mlab
-import matplotlib.pyplot as plt
-import numpy as np
-
-import BaseDomsHandler
-import ResultsStorage
-
-if not matplotlib.get_backend():
- matplotlib.use('Agg')
-
-PARAMETER_TO_FIELD = {
- "sst": "sea_water_temperature",
- "sss": "sea_water_salinity"
-}
-
-PARAMETER_TO_UNITS = {
- "sst": "($^\circ$C)",
- "sss": "(g/L)"
-}
-
-
-class DomsHistogramPlotQueryResults(BaseDomsHandler.DomsQueryResults):
-
- def __init__(self, x, parameter, primary, secondary, args=None, bounds=None, count=None, details=None,
- computeOptions=None, executionId=None, plot=None):
- BaseDomsHandler.DomsQueryResults.__init__(self, results=x, args=args, details=details, bounds=bounds,
- count=count, computeOptions=computeOptions, executionId=executionId)
- self.__primary = primary
- self.__secondary = secondary
- self.__x = x
- self.__parameter = parameter
- self.__plot = plot
-
- def toImage(self):
- return self.__plot
-
-
-def render(d, x, primary, secondary, parameter, norm_and_curve=False):
- fig, ax = plt.subplots()
- fig.suptitle(string.upper("%s vs. %s" % (primary, secondary)), fontsize=14, fontweight='bold')
-
- n, bins, patches = plt.hist(x, 50, normed=norm_and_curve, facecolor='green', alpha=0.75)
-
- if norm_and_curve:
- mean = np.mean(x)
- variance = np.var(x)
- sigma = np.sqrt(variance)
- y = mlab.normpdf(bins, mean, sigma)
- l = plt.plot(bins, y, 'r--', linewidth=1)
-
- ax.set_title('n = %d' % len(x))
-
- units = PARAMETER_TO_UNITS[parameter] if parameter in PARAMETER_TO_UNITS else PARAMETER_TO_UNITS["sst"]
- ax.set_xlabel("%s - %s %s" % (primary, secondary, units))
-
- if norm_and_curve:
- ax.set_ylabel("Probability per unit difference")
- else:
- ax.set_ylabel("Frequency")
-
- plt.grid(True)
-
- sio = StringIO()
- plt.savefig(sio, format='png')
- d['plot'] = sio.getvalue()
-
-
-def renderAsync(x, primary, secondary, parameter, norm_and_curve):
- manager = Manager()
- d = manager.dict()
- p = Process(target=render, args=(d, x, primary, secondary, parameter, norm_and_curve))
- p.start()
- p.join()
- return d['plot']
-
-
-def createHistogramPlot(id, parameter, norm_and_curve=False):
- with ResultsStorage.ResultsRetrieval() as storage:
- params, stats, data = storage.retrieveResults(id)
-
- primary = params["primary"]
- secondary = params["matchup"][0]
-
- x = createHistTable(data, secondary, parameter)
-
- plot = renderAsync(x, primary, secondary, parameter, norm_and_curve)
-
- r = DomsHistogramPlotQueryResults(x=x, parameter=parameter, primary=primary, secondary=secondary,
- args=params, details=stats,
- bounds=None, count=None, computeOptions=None, executionId=id, plot=plot)
- return r
-
-
-def createHistTable(results, secondary, parameter):
- x = []
-
- field = PARAMETER_TO_FIELD[parameter] if parameter in PARAMETER_TO_FIELD else PARAMETER_TO_FIELD["sst"]
-
- for entry in results:
- for match in entry["matches"]:
- if match["source"] == secondary:
- if field in entry and field in match:
- a = entry[field]
- b = match[field]
- x.append((a - b))
-
- return x
diff --git a/analysis/webservice/algorithms/doms/insitusubset.py b/analysis/webservice/algorithms/doms/insitusubset.py
deleted file mode 100644
index 7f60e99..0000000
--- a/analysis/webservice/algorithms/doms/insitusubset.py
+++ /dev/null
@@ -1,263 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import StringIO
-import csv
-import json
-import logging
-from datetime import datetime
-
-import requests
-
-import BaseDomsHandler
-from webservice.NexusHandler import nexus_handler
-from webservice.algorithms.doms import config as edge_endpoints
-from webservice.webmodel import NexusProcessingException, NoDataException
-
-ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
-
-
-@nexus_handler
-class DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
- name = "DOMS In Situ Subsetter"
- path = "/domsinsitusubset"
- description = "Subset a DOMS in situ source given the search domain."
-
- params = [
- {
- "name": "source",
- "type": "comma-delimited string",
- "description": "The in situ Dataset to be sub-setted",
- "required": "true",
- "sample": "spurs"
- },
- {
- "name": "parameter",
- "type": "string",
- "description": "The parameter of interest. One of 'sst', 'sss', 'wind'",
- "required": "false",
- "default": "All",
- "sample": "sss"
- },
- {
- "name": "startTime",
- "type": "string",
- "description": "Starting time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH",
- "required": "true",
- "sample": "2013-10-21T00:00:00Z"
- },
- {
- "name": "endTime",
- "type": "string",
- "description": "Ending time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH",
- "required": "true",
- "sample": "2013-10-31T23:59:59Z"
- },
- {
- "name": "b",
- "type": "comma-delimited float",
- "description": "Minimum (Western) Longitude, Minimum (Southern) Latitude, "
- "Maximum (Eastern) Longitude, Maximum (Northern) Latitude",
- "required": "true",
- "sample": "-30,15,-45,30"
- },
- {
- "name": "depthMin",
- "type": "float",
- "description": "Minimum depth of measurements. Must be less than depthMax",
- "required": "false",
- "default": "No limit",
- "sample": "0"
- },
- {
- "name": "depthMax",
- "type": "float",
- "description": "Maximum depth of measurements. Must be greater than depthMin",
- "required": "false",
- "default": "No limit",
- "sample": "5"
- },
- {
- "name": "platforms",
- "type": "comma-delimited integer",
- "description": "Platforms to include for subset consideration",
- "required": "false",
- "default": "All",
- "sample": "1,2,3,4,5,6,7,8,9"
- },
- {
- "name": "output",
- "type": "string",
- "description": "Output type. Only 'CSV' or 'JSON' is currently supported",
- "required": "false",
- "default": "JSON",
- "sample": "CSV"
- }
- ]
- singleton = True
-
- def __init__(self):
- BaseDomsHandler.BaseDomsQueryCalcHandler.__init__(self)
- self.log = logging.getLogger(__name__)
-
- def parse_arguments(self, request):
- # Parse input arguments
- self.log.debug("Parsing arguments")
-
- source_name = request.get_argument('source', None)
- if source_name is None or source_name.strip() == '':
- raise NexusProcessingException(reason="'source' argument is required", code=400)
-
- parameter_s = request.get_argument('parameter', None)
- if parameter_s not in ['sst', 'sss', 'wind', None]:
- raise NexusProcessingException(
- reason="Parameter %s not supported. Must be one of 'sst', 'sss', 'wind'." % parameter_s, code=400)
-
- try:
- start_time = request.get_start_datetime()
- start_time = start_time.strftime("%Y-%m-%dT%H:%M:%SZ")
- except:
- raise NexusProcessingException(
- reason="'startTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ",
- code=400)
- try:
- end_time = request.get_end_datetime()
- end_time = end_time.strftime("%Y-%m-%dT%H:%M:%SZ")
- except:
- raise NexusProcessingException(
- reason="'endTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ",
- code=400)
-
- if start_time > end_time:
- raise NexusProcessingException(
- reason="The starting time must be before the ending time. Received startTime: %s, endTime: %s" % (
- request.get_start_datetime().strftime(ISO_8601), request.get_end_datetime().strftime(ISO_8601)),
- code=400)
-
- try:
- bounding_polygon = request.get_bounding_polygon()
- except:
- raise NexusProcessingException(
- reason="'b' argument is required. Must be comma-delimited float formatted as Minimum (Western) Longitude, Minimum (Southern) Latitude, Maximum (Eastern) Longitude, Maximum (Northern) Latitude",
- code=400)
-
- depth_min = request.get_decimal_arg('depthMin', default=None)
- depth_max = request.get_decimal_arg('depthMax', default=None)
-
- if depth_min is not None and depth_max is not None and depth_min >= depth_max:
- raise NexusProcessingException(
- reason="Depth Min should be less than Depth Max", code=400)
-
- platforms = request.get_argument('platforms', None)
- if platforms is not None:
- try:
- p_validation = platforms.split(',')
- p_validation = [int(p) for p in p_validation]
- del p_validation
- except:
- raise NexusProcessingException(reason="platforms must be a comma-delimited list of integers", code=400)
-
- return source_name, parameter_s, start_time, end_time, bounding_polygon, depth_min, depth_max, platforms
-
- def calc(self, request, **args):
-
- source_name, parameter_s, start_time, end_time, bounding_polygon, \
- depth_min, depth_max, platforms = self.parse_arguments(request)
-
- with requests.session() as edge_session:
- edge_results = query_edge(source_name, parameter_s, start_time, end_time,
- ','.join([str(bound) for bound in bounding_polygon.bounds]),
- platforms, depth_min, depth_max, edge_session)['results']
-
- if len(edge_results) == 0:
- raise NoDataException
- return InSituSubsetResult(results=edge_results)
-
-
-class InSituSubsetResult(object):
- def __init__(self, results):
- self.results = results
-
- def toJson(self):
- return json.dumps(self.results, indent=4)
-
- def toCSV(self):
- fieldnames = sorted(next(iter(self.results)).keys())
-
- csv_mem_file = StringIO.StringIO()
- try:
- writer = csv.DictWriter(csv_mem_file, fieldnames=fieldnames)
-
- writer.writeheader()
- writer.writerows(self.results)
- csv_out = csv_mem_file.getvalue()
- finally:
- csv_mem_file.close()
-
- return csv_out
-
-
-def query_edge(dataset, variable, startTime, endTime, bbox, platform, depth_min, depth_max, session, itemsPerPage=1000,
- startIndex=0, stats=True):
- log = logging.getLogger('webservice.algorithms.doms.insitusubset.query_edge')
- try:
- startTime = datetime.utcfromtimestamp(startTime).strftime('%Y-%m-%dT%H:%M:%SZ')
- except TypeError:
- # Assume we were passed a properly formatted string
- pass
-
- try:
- endTime = datetime.utcfromtimestamp(endTime).strftime('%Y-%m-%dT%H:%M:%SZ')
- except TypeError:
- # Assume we were passed a properly formatted string
- pass
-
- try:
- platform = platform.split(',')
- except AttributeError:
- # Assume we were passed a list
- pass
-
- params = {"startTime": startTime,
- "endTime": endTime,
- "bbox": bbox,
- "minDepth": depth_min,
- "maxDepth": depth_max,
- "itemsPerPage": itemsPerPage, "startIndex": startIndex, "stats": str(stats).lower()}
-
- if variable:
- params['variable'] = variable
- if platform:
- params['platform'] = platform
-
- edge_request = session.get(edge_endpoints.getEndpointByName(dataset)['url'], params=params)
-
- edge_request.raise_for_status()
- edge_response = json.loads(edge_request.text)
-
- # Get all edge results
- next_page_url = edge_response.get('next', None)
- while next_page_url is not None:
- log.debug("requesting %s" % next_page_url)
- edge_page_request = session.get(next_page_url)
-
- edge_page_request.raise_for_status()
- edge_page_response = json.loads(edge_page_request.text)
-
- edge_response['results'].extend(edge_page_response['results'])
-
- next_page_url = edge_page_response.get('next', None)
-
- return edge_response
diff --git a/analysis/webservice/algorithms/doms/mapplot.py b/analysis/webservice/algorithms/doms/mapplot.py
deleted file mode 100644
index 3af85d3..0000000
--- a/analysis/webservice/algorithms/doms/mapplot.py
+++ /dev/null
@@ -1,175 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import string
-from cStringIO import StringIO
-from multiprocessing import Process, Manager
-
-import matplotlib
-import matplotlib.pyplot as plt
-import numpy as np
-from mpl_toolkits.basemap import Basemap
-
-import BaseDomsHandler
-import ResultsStorage
-
-if not matplotlib.get_backend():
- matplotlib.use('Agg')
-
-PARAMETER_TO_FIELD = {
- "sst": "sea_water_temperature",
- "sss": "sea_water_salinity"
-}
-
-PARAMETER_TO_UNITS = {
- "sst": "($^\circ$ C)",
- "sss": "(g/L)"
-}
-
-
-def __square(minLon, maxLon, minLat, maxLat):
- if maxLat - minLat > maxLon - minLon:
- a = ((maxLat - minLat) - (maxLon - minLon)) / 2.0
- minLon -= a
- maxLon += a
- elif maxLon - minLon > maxLat - minLat:
- a = ((maxLon - minLon) - (maxLat - minLat)) / 2.0
- minLat -= a
- maxLat += a
-
- return minLon, maxLon, minLat, maxLat
-
-
-def render(d, lats, lons, z, primary, secondary, parameter):
- fig = plt.figure()
- ax = fig.add_axes([0.1, 0.1, 0.8, 0.8])
-
- ax.set_title(string.upper("%s vs. %s" % (primary, secondary)))
- # ax.set_ylabel('Latitude')
- # ax.set_xlabel('Longitude')
-
- minLatA = np.min(lats)
- maxLatA = np.max(lats)
- minLonA = np.min(lons)
- maxLonA = np.max(lons)
-
- minLat = minLatA - (abs(maxLatA - minLatA) * 0.1)
- maxLat = maxLatA + (abs(maxLatA - minLatA) * 0.1)
-
- minLon = minLonA - (abs(maxLonA - minLonA) * 0.1)
- maxLon = maxLonA + (abs(maxLonA - minLonA) * 0.1)
-
- minLon, maxLon, minLat, maxLat = __square(minLon, maxLon, minLat, maxLat)
-
- # m = Basemap(projection='mill', llcrnrlon=-180,llcrnrlat=-80,urcrnrlon=180,urcrnrlat=80,resolution='l')
- m = Basemap(projection='mill', llcrnrlon=minLon, llcrnrlat=minLat, urcrnrlon=maxLon, urcrnrlat=maxLat,
- resolution='l')
-
- m.drawparallels(np.arange(minLat, maxLat, (maxLat - minLat) / 5.0), labels=[1, 0, 0, 0], fontsize=10)
- m.drawmeridians(np.arange(minLon, maxLon, (maxLon - minLon) / 5.0), labels=[0, 0, 0, 1], fontsize=10)
-
- m.drawcoastlines()
- m.drawmapboundary(fill_color='#99ffff')
- m.fillcontinents(color='#cc9966', lake_color='#99ffff')
-
- # lats, lons = np.meshgrid(lats, lons)
-
- masked_array = np.ma.array(z, mask=np.isnan(z))
- z = masked_array
-
- values = np.zeros(len(z))
- for i in range(0, len(z)):
- values[i] = ((z[i] - np.min(z)) / (np.max(z) - np.min(z)) * 20.0) + 10
-
- x, y = m(lons, lats)
-
- im1 = m.scatter(x, y, values)
-
- im1.set_array(z)
- cb = m.colorbar(im1)
-
- units = PARAMETER_TO_UNITS[parameter] if parameter in PARAMETER_TO_UNITS else PARAMETER_TO_UNITS["sst"]
- cb.set_label("Difference %s" % units)
-
- sio = StringIO()
- plt.savefig(sio, format='png')
- plot = sio.getvalue()
- if d is not None:
- d['plot'] = plot
- return plot
-
-
-class DomsMapPlotQueryResults(BaseDomsHandler.DomsQueryResults):
- def __init__(self, lats, lons, z, parameter, primary, secondary, args=None, bounds=None, count=None, details=None,
- computeOptions=None, executionId=None, plot=None):
- BaseDomsHandler.DomsQueryResults.__init__(self, results={"lats": lats, "lons": lons, "values": z}, args=args,
- details=details, bounds=bounds, count=count,
- computeOptions=computeOptions, executionId=executionId)
- self.__lats = lats
- self.__lons = lons
- self.__z = np.array(z)
- self.__parameter = parameter
- self.__primary = primary
- self.__secondary = secondary
- self.__plot = plot
-
- def toImage(self):
- return self.__plot
-
-
-def renderAsync(x, y, z, primary, secondary, parameter):
- manager = Manager()
- d = manager.dict()
- p = Process(target=render, args=(d, x, y, z, primary, secondary, parameter))
- p.start()
- p.join()
- return d['plot']
-
-
-def createMapPlot(id, parameter):
- with ResultsStorage.ResultsRetrieval() as storage:
- params, stats, data = storage.retrieveResults(id)
-
- primary = params["primary"]
- secondary = params["matchup"][0]
-
- lats = []
- lons = []
- z = []
-
- field = PARAMETER_TO_FIELD[parameter] if parameter in PARAMETER_TO_FIELD else PARAMETER_TO_FIELD["sst"]
-
- for entry in data:
- for match in entry["matches"]:
- if match["source"] == secondary:
-
- if field in entry and field in match:
- a = entry[field]
- b = match[field]
- z.append((a - b))
- z.append((a - b))
- else:
- z.append(1.0)
- z.append(1.0)
- lats.append(entry["y"])
- lons.append(entry["x"])
- lats.append(match["y"])
- lons.append(match["x"])
-
- plot = renderAsync(lats, lons, z, primary, secondary, parameter)
- r = DomsMapPlotQueryResults(lats=lats, lons=lons, z=z, parameter=parameter, primary=primary, secondary=secondary,
- args=params,
- details=stats, bounds=None, count=None, computeOptions=None, executionId=id, plot=plot)
- return r
diff --git a/analysis/webservice/algorithms/doms/scatterplot.py b/analysis/webservice/algorithms/doms/scatterplot.py
deleted file mode 100644
index 2ff57ee..0000000
--- a/analysis/webservice/algorithms/doms/scatterplot.py
+++ /dev/null
@@ -1,118 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import string
-from cStringIO import StringIO
-from multiprocessing import Process, Manager
-
-import matplotlib
-import matplotlib.pyplot as plt
-
-import BaseDomsHandler
-import ResultsStorage
-
-if not matplotlib.get_backend():
- matplotlib.use('Agg')
-
-PARAMETER_TO_FIELD = {
- "sst": "sea_water_temperature",
- "sss": "sea_water_salinity"
-}
-
-PARAMETER_TO_UNITS = {
- "sst": "($^\circ$ C)",
- "sss": "(g/L)"
-}
-
-
-def render(d, x, y, z, primary, secondary, parameter):
- fig, ax = plt.subplots()
-
- ax.set_title(string.upper("%s vs. %s" % (primary, secondary)))
-
- units = PARAMETER_TO_UNITS[parameter] if parameter in PARAMETER_TO_UNITS else PARAMETER_TO_UNITS[
- "sst"]
- ax.set_ylabel("%s %s" % (secondary, units))
- ax.set_xlabel("%s %s" % (primary, units))
-
- ax.scatter(x, y)
-
- sio = StringIO()
- plt.savefig(sio, format='png')
- d['plot'] = sio.getvalue()
-
-
-class DomsScatterPlotQueryResults(BaseDomsHandler.DomsQueryResults):
-
- def __init__(self, x, y, z, parameter, primary, secondary, args=None, bounds=None, count=None, details=None,
- computeOptions=None, executionId=None, plot=None):
- BaseDomsHandler.DomsQueryResults.__init__(self, results=[x, y], args=args, details=details, bounds=bounds,
- count=count, computeOptions=computeOptions, executionId=executionId)
- self.__primary = primary
- self.__secondary = secondary
- self.__x = x
- self.__y = y
- self.__z = z
- self.__parameter = parameter
- self.__plot = plot
-
- def toImage(self):
- return self.__plot
-
-
-def renderAsync(x, y, z, primary, secondary, parameter):
- manager = Manager()
- d = manager.dict()
- p = Process(target=render, args=(d, x, y, z, primary, secondary, parameter))
- p.start()
- p.join()
- return d['plot']
-
-
-def createScatterPlot(id, parameter):
- with ResultsStorage.ResultsRetrieval() as storage:
- params, stats, data = storage.retrieveResults(id)
-
- primary = params["primary"]
- secondary = params["matchup"][0]
-
- x, y, z = createScatterTable(data, secondary, parameter)
-
- plot = renderAsync(x, y, z, primary, secondary, parameter)
-
- r = DomsScatterPlotQueryResults(x=x, y=y, z=z, parameter=parameter, primary=primary, secondary=secondary,
- args=params, details=stats,
- bounds=None, count=None, computeOptions=None, executionId=id, plot=plot)
- return r
-
-
-def createScatterTable(results, secondary, parameter):
- x = []
- y = []
- z = []
-
- field = PARAMETER_TO_FIELD[parameter] if parameter in PARAMETER_TO_FIELD else PARAMETER_TO_FIELD["sst"]
-
- for entry in results:
- for match in entry["matches"]:
- if match["source"] == secondary:
- if field in entry and field in match:
- a = entry[field]
- b = match[field]
- x.append(a)
- y.append(b)
- z.append(a - b)
-
- return x, y, z
diff --git a/analysis/webservice/algorithms/doms/subsetter.py b/analysis/webservice/algorithms/doms/subsetter.py
deleted file mode 100644
index 67a2276..0000000
--- a/analysis/webservice/algorithms/doms/subsetter.py
+++ /dev/null
@@ -1,260 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import logging
-import os
-import tempfile
-import zipfile
-from datetime import datetime
-
-import requests
-
-import BaseDomsHandler
-from webservice.NexusHandler import nexus_handler
-from webservice.webmodel import NexusProcessingException
-
-ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
-
-
-def is_blank(my_string):
- return not (my_string and my_string.strip() != '')
-
-
-@nexus_handler
-class DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
- name = "DOMS Subsetter"
- path = "/domssubset"
- description = "Subset DOMS sources given the search domain"
-
- params = {
- "dataset": {
- "name": "NEXUS Dataset",
- "type": "string",
- "description": "The NEXUS dataset. Optional but at least one of 'dataset' or 'insitu' are required"
- },
- "insitu": {
- "name": "In Situ sources",
- "type": "comma-delimited string",
- "description": "The in situ source(s). Optional but at least one of 'dataset' or 'insitu' are required"
- },
- "parameter": {
- "name": "Data Parameter",
- "type": "string",
- "description": "The parameter of interest. One of 'sst', 'sss', 'wind'. Required"
- },
- "startTime": {
- "name": "Start Time",
- "type": "string",
- "description": "Starting time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH. Required"
- },
- "endTime": {
- "name": "End Time",
- "type": "string",
- "description": "Ending time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH. Required"
- },
- "b": {
- "name": "Bounding box",
- "type": "comma-delimited float",
- "description": "Minimum (Western) Longitude, Minimum (Southern) Latitude, "
- "Maximum (Eastern) Longitude, Maximum (Northern) Latitude. Required"
- },
- "depthMin": {
- "name": "Minimum Depth",
- "type": "float",
- "description": "Minimum depth of measurements. Must be less than depthMax. Optional"
- },
- "depthMax": {
- "name": "Maximum Depth",
- "type": "float",
- "description": "Maximum depth of measurements. Must be greater than depthMin. Optional"
- },
- "platforms": {
- "name": "Platforms",
- "type": "comma-delimited integer",
- "description": "Platforms to include for subset consideration. Optional"
- },
- "output": {
- "name": "Output",
- "type": "string",
- "description": "Output type. Only 'ZIP' is currently supported. Required"
- }
- }
- singleton = True
-
- def __init__(self):
- BaseDomsHandler.BaseDomsQueryCalcHandler.__init__(self)
- self.log = logging.getLogger(__name__)
-
- def parse_arguments(self, request):
- # Parse input arguments
- self.log.debug("Parsing arguments")
-
- primary_ds_name = request.get_argument('dataset', None)
- matchup_ds_names = request.get_argument('insitu', None)
-
- if is_blank(primary_ds_name) and is_blank(matchup_ds_names):
- raise NexusProcessingException(reason="Either 'dataset', 'insitu', or both arguments are required",
- code=400)
-
- if matchup_ds_names is not None:
- try:
- matchup_ds_names = matchup_ds_names.split(',')
- except:
- raise NexusProcessingException(reason="'insitu' argument should be a comma-seperated list", code=400)
-
- parameter_s = request.get_argument('parameter', None)
- if parameter_s not in ['sst', 'sss', 'wind']:
- raise NexusProcessingException(
- reason="Parameter %s not supported. Must be one of 'sst', 'sss', 'wind'." % parameter_s, code=400)
-
- try:
- start_time = request.get_start_datetime()
- start_time = start_time.strftime("%Y-%m-%dT%H:%M:%SZ")
- except:
- raise NexusProcessingException(
- reason="'startTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ",
- code=400)
- try:
- end_time = request.get_end_datetime()
- end_time = end_time.strftime("%Y-%m-%dT%H:%M:%SZ")
- except:
- raise NexusProcessingException(
- reason="'endTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ",
- code=400)
-
- if start_time > end_time:
- raise NexusProcessingException(
- reason="The starting time must be before the ending time. Received startTime: %s, endTime: %s" % (
- request.get_start_datetime().strftime(ISO_8601), request.get_end_datetime().strftime(ISO_8601)),
- code=400)
-
- try:
- bounding_polygon = request.get_bounding_polygon()
- except:
- raise NexusProcessingException(
- reason="'b' argument is required. Must be comma-delimited float formatted as Minimum (Western) Longitude, Minimum (Southern) Latitude, Maximum (Eastern) Longitude, Maximum (Northern) Latitude",
- code=400)
-
- depth_min = request.get_decimal_arg('depthMin', default=None)
- depth_max = request.get_decimal_arg('depthMax', default=None)
-
- if depth_min is not None and depth_max is not None and depth_min >= depth_max:
- raise NexusProcessingException(
- reason="Depth Min should be less than Depth Max", code=400)
-
- platforms = request.get_argument('platforms', None)
- if platforms is not None:
- try:
- p_validation = platforms.split(',')
- p_validation = [int(p) for p in p_validation]
- del p_validation
- except:
- raise NexusProcessingException(reason="platforms must be a comma-delimited list of integers", code=400)
-
- return primary_ds_name, matchup_ds_names, parameter_s, start_time, end_time, \
- bounding_polygon, depth_min, depth_max, platforms
-
- def calc(self, request, **args):
-
- primary_ds_name, matchup_ds_names, parameter_s, start_time, end_time, \
- bounding_polygon, depth_min, depth_max, platforms = self.parse_arguments(request)
-
- primary_url = "https://doms.jpl.nasa.gov/datainbounds"
- primary_params = {
- 'ds': primary_ds_name,
- 'parameter': parameter_s,
- 'b': ','.join([str(bound) for bound in bounding_polygon.bounds]),
- 'startTime': start_time,
- 'endTime': end_time,
- 'output': "CSV"
- }
-
- matchup_url = "https://doms.jpl.nasa.gov/domsinsitusubset"
- matchup_params = {
- 'source': None,
- 'parameter': parameter_s,
- 'startTime': start_time,
- 'endTime': end_time,
- 'b': ','.join([str(bound) for bound in bounding_polygon.bounds]),
- 'depthMin': depth_min,
- 'depthMax': depth_max,
- 'platforms': platforms,
- 'output': 'CSV'
- }
-
- primary_temp_file_path = None
- matchup_downloads = None
-
- with requests.session() as session:
-
- if not is_blank(primary_ds_name):
- # Download primary
- primary_temp_file, primary_temp_file_path = tempfile.mkstemp(suffix='.csv')
- download_file(primary_url, primary_temp_file_path, session, params=primary_params)
-
- if len(matchup_ds_names) > 0:
- # Download matchup
- matchup_downloads = {}
- for matchup_ds in matchup_ds_names:
- matchup_downloads[matchup_ds] = tempfile.mkstemp(suffix='.csv')
- matchup_params['source'] = matchup_ds
- download_file(matchup_url, matchup_downloads[matchup_ds][1], session, params=matchup_params)
-
- # Zip downloads
- date_range = "%s-%s" % (datetime.strptime(start_time, "%Y-%m-%dT%H:%M:%SZ").strftime("%Y%m%d"),
- datetime.strptime(end_time, "%Y-%m-%dT%H:%M:%SZ").strftime("%Y%m%d"))
- bounds = '%.4fW_%.4fS_%.4fE_%.4fN' % bounding_polygon.bounds
- zip_dir = tempfile.mkdtemp()
- zip_path = '%s/subset.%s.%s.zip' % (zip_dir, date_range, bounds)
- with zipfile.ZipFile(zip_path, 'w') as my_zip:
- if primary_temp_file_path:
- my_zip.write(primary_temp_file_path, arcname='%s.%s.%s.csv' % (primary_ds_name, date_range, bounds))
- if matchup_downloads:
- for matchup_ds, download in matchup_downloads.iteritems():
- my_zip.write(download[1], arcname='%s.%s.%s.csv' % (matchup_ds, date_range, bounds))
-
- # Clean up
- if primary_temp_file_path:
- os.remove(primary_temp_file_path)
- if matchup_downloads:
- for matchup_ds, download in matchup_downloads.iteritems():
- os.remove(download[1])
-
- return SubsetResult(zip_path)
-
-
-class SubsetResult(object):
- def __init__(self, zip_path):
- self.zip_path = zip_path
-
- def toJson(self):
- raise NotImplementedError
-
- def toZip(self):
- with open(self.zip_path, 'rb') as zip_file:
- zip_contents = zip_file.read()
-
- return zip_contents
-
- def cleanup(self):
- os.remove(self.zip_path)
-
-
-def download_file(url, filepath, session, params=None):
- r = session.get(url, params=params, stream=True)
- with open(filepath, 'wb') as f:
- for chunk in r.iter_content(chunk_size=1024):
- if chunk: # filter out keep-alive new chunks
- f.write(chunk)
diff --git a/analysis/webservice/algorithms/doms/values.py b/analysis/webservice/algorithms/doms/values.py
deleted file mode 100644
index c47d450..0000000
--- a/analysis/webservice/algorithms/doms/values.py
+++ /dev/null
@@ -1,72 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-PLATFORMS = [
- {"id": 1, "desc": "ship"},
- {"id": 2, "desc": "moored surface buoy"},
- {"id": 3, "desc": "drifting surface float"},
- {"id": 4, "desc": "drifting subsurface profiling float"},
- {"id": 5, "desc": "autonomous underwater vehicle"},
- {"id": 6, "desc": "offshore structure"},
- {"id": 7, "desc": "coastal structure"},
- {"id": 8, "desc": "towed unmanned submersible"},
- {"id": 9, "desc": "orbiting satellite"}
-]
-
-DEVICES = [
- {"id": 1, "desc": "bathythermographs"},
- {"id": 2, "desc": "discrete water samplers"},
- {"id": 3, "desc": "CTD"},
- {"id": 4, "desc": "Current profilers / acousticDopplerCurrentProfiler"},
- {"id": 5, "desc": "radiometers"},
- {"id": 6, "desc": "scatterometers"}
-]
-
-MISSIONS = [
- {"id": 1, "desc": "SAMOS"},
- {"id": 2, "desc": "ICOADS"},
- {"id": 3, "desc": "Aquarius"},
- {"id": 4, "desc": "SPURS1"}
-]
-
-
-def getDescById(list, id):
- for item in list:
- if item["id"] == id:
- return item["desc"]
- return id
-
-
-def getPlatformById(id):
- return getDescById(PLATFORMS, id)
-
-
-def getDeviceById(id):
- return getDescById(DEVICES, id)
-
-
-def getMissionById(id):
- return getDescById(MISSIONS, id)
-
-
-def getDescByListNameAndId(listName, id):
- if listName.upper() == "PLATFORM":
- return getPlatformById(id)
- elif listName.upper() == "DEVICE":
- return getDeviceById(id)
- elif listName.upper() == "MISSION":
- return getMissionById(id)
- else:
- raise Exception("Invalid list name specified ('%s')" % listName)
diff --git a/analysis/webservice/algorithms/doms/workerthread.py b/analysis/webservice/algorithms/doms/workerthread.py
deleted file mode 100644
index 7639c00..0000000
--- a/analysis/webservice/algorithms/doms/workerthread.py
+++ /dev/null
@@ -1,61 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import threading
-
-
-class WorkerThread(threading.Thread):
-
- def __init__(self, method, params):
- threading.Thread.__init__(self)
- self.method = method
- self.params = params
- self.completed = False
- self.results = None
-
- def run(self):
- self.results = self.method(*self.params)
- self.completed = True
-
-
-def __areAllComplete(threads):
- for thread in threads:
- if not thread.completed:
- return False
-
- return True
-
-
-def wait(threads, startFirst=False, poll=0.5):
- if startFirst:
- for thread in threads:
- thread.start()
-
- while not __areAllComplete(threads):
- threading._sleep(poll)
-
-
-def foo(param1, param2):
- print param1, param2
- return "c"
-
-
-if __name__ == "__main__":
-
- thread = WorkerThread(foo, params=("a", "b"))
- thread.start()
- while not thread.completed:
- threading._sleep(0.5)
- print thread.results
diff --git a/analysis/webservice/algorithms_spark/Matchup.py b/analysis/webservice/algorithms_spark/Matchup.py
deleted file mode 100644
index 9ae7557..0000000
--- a/analysis/webservice/algorithms_spark/Matchup.py
+++ /dev/null
@@ -1,703 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-
-import json
-import logging
-import threading
-from datetime import datetime
-from itertools import chain
-from math import cos, radians
-
-import numpy as np
-import pyproj
-import requests
-from nexustiles.nexustiles import NexusTileService
-from pytz import timezone, UTC
-from scipy import spatial
-from shapely import wkt
-from shapely.geometry import Point
-from shapely.geometry import box
-from shapely.geos import WKTReadingError
-
-from webservice.NexusHandler import nexus_handler
-from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler
-from webservice.algorithms.doms import config as edge_endpoints
-from webservice.algorithms.doms import values as doms_values
-from webservice.algorithms.doms.BaseDomsHandler import DomsQueryResults
-from webservice.algorithms.doms.ResultsStorage import ResultsStorage
-from webservice.webmodel import NexusProcessingException
-
-EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
-ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
-
-
-def iso_time_to_epoch(str_time):
- return (datetime.strptime(str_time, "%Y-%m-%dT%H:%M:%SZ").replace(
- tzinfo=UTC) - EPOCH).total_seconds()
-
-
-@nexus_handler
-class Matchup(NexusCalcSparkHandler):
- name = "Matchup"
- path = "/match_spark"
- description = "Match measurements between two or more datasets"
-
- params = {
- "primary": {
- "name": "Primary Dataset",
- "type": "string",
- "description": "The Primary dataset used to find matches for. Required"
- },
- "matchup": {
- "name": "Match-Up Datasets",
- "type": "comma-delimited string",
- "description": "The Dataset(s) being searched for measurements that match the Primary. Required"
- },
- "parameter": {
- "name": "Match-Up Parameter",
- "type": "string",
- "description": "The parameter of interest used for the match up. One of 'sst', 'sss', 'wind'. Required"
- },
- "startTime": {
- "name": "Start Time",
- "type": "string",
- "description": "Starting time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH. Required"
- },
- "endTime": {
- "name": "End Time",
- "type": "string",
- "description": "Ending time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH. Required"
- },
- "b": {
- "name": "Bounding box",
- "type": "comma-delimited float",
- "description": "Minimum (Western) Longitude, Minimum (Southern) Latitude, "
- "Maximum (Eastern) Longitude, Maximum (Northern) Latitude. Required"
- },
- "depthMin": {
- "name": "Minimum Depth",
- "type": "float",
- "description": "Minimum depth of measurements. Must be less than depthMax. Optional. Default: no limit"
- },
- "depthMax": {
- "name": "Maximum Depth",
- "type": "float",
- "description": "Maximum depth of measurements. Must be greater than depthMin. Optional. Default: no limit"
- },
- "tt": {
- "name": "Time Tolerance",
- "type": "long",
- "description": "Tolerance in time (seconds) when comparing two measurements. Optional. Default: 86400"
- },
- "rt": {
- "name": "Radius Tolerance",
- "type": "float",
- "description": "Tolerance in radius (meters) when comparing two measurements. Optional. Default: 1000"
- },
- "platforms": {
- "name": "Platforms",
- "type": "comma-delimited integer",
- "description": "Platforms to include for matchup consideration. Required"
- },
- "matchOnce": {
- "name": "Match Once",
- "type": "boolean",
- "description": "Optional True/False flag used to determine if more than one match per primary point is returned. "
- + "If true, only the nearest point will be returned for each primary point. "
- + "If false, all points within the tolerances will be returned for each primary point. Default: False"
- },
- "resultSizeLimit": {
- "name": "Result Size Limit",
- "type": "int",
- "description": "Optional integer value that limits the number of results returned from the matchup. "
- "If the number of primary matches is greater than this limit, the service will respond with "
- "(HTTP 202: Accepted) and an empty response body. A value of 0 means return all results. "
- "Default: 500"
- }
- }
- singleton = True
-
- def __init__(self, algorithm_config=None, sc=None):
- NexusCalcSparkHandler.__init__(self, algorithm_config=algorithm_config, sc=sc, skipCassandra=True)
- self.log = logging.getLogger(__name__)
-
- def parse_arguments(self, request):
- # Parse input arguments
- self.log.debug("Parsing arguments")
- try:
- bounding_polygon = request.get_bounding_polygon()
- except:
- raise NexusProcessingException(
- reason="'b' argument is required. Must be comma-delimited float formatted as Minimum (Western) Longitude, Minimum (Southern) Latitude, Maximum (Eastern) Longitude, Maximum (Northern) Latitude",
- code=400)
- primary_ds_name = request.get_argument('primary', None)
- if primary_ds_name is None:
- raise NexusProcessingException(reason="'primary' argument is required", code=400)
- matchup_ds_names = request.get_argument('matchup', None)
- if matchup_ds_names is None:
- raise NexusProcessingException(reason="'matchup' argument is required", code=400)
-
- parameter_s = request.get_argument('parameter', 'sst')
- if parameter_s not in ['sst', 'sss', 'wind']:
- raise NexusProcessingException(
- reason="Parameter %s not supported. Must be one of 'sst', 'sss', 'wind'." % parameter_s, code=400)
-
- try:
- start_time = request.get_start_datetime()
- except:
- raise NexusProcessingException(
- reason="'startTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ",
- code=400)
- try:
- end_time = request.get_end_datetime()
- except:
- raise NexusProcessingException(
- reason="'endTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ",
- code=400)
-
- if start_time > end_time:
- raise NexusProcessingException(
- reason="The starting time must be before the ending time. Received startTime: %s, endTime: %s" % (
- request.get_start_datetime().strftime(ISO_8601), request.get_end_datetime().strftime(ISO_8601)),
- code=400)
-
- depth_min = request.get_decimal_arg('depthMin', default=None)
- depth_max = request.get_decimal_arg('depthMax', default=None)
-
- if depth_min is not None and depth_max is not None and depth_min >= depth_max:
- raise NexusProcessingException(
- reason="Depth Min should be less than Depth Max", code=400)
-
- time_tolerance = request.get_int_arg('tt', default=86400)
- radius_tolerance = request.get_decimal_arg('rt', default=1000.0)
- platforms = request.get_argument('platforms', None)
- if platforms is None:
- raise NexusProcessingException(reason="'platforms' argument is required", code=400)
- try:
- p_validation = platforms.split(',')
- p_validation = [int(p) for p in p_validation]
- del p_validation
- except:
- raise NexusProcessingException(reason="platforms must be a comma-delimited list of integers", code=400)
-
- match_once = request.get_boolean_arg("matchOnce", default=False)
-
- result_size_limit = request.get_int_arg("resultSizeLimit", default=500)
-
- start_seconds_from_epoch = long((start_time - EPOCH).total_seconds())
- end_seconds_from_epoch = long((end_time - EPOCH).total_seconds())
-
- return bounding_polygon, primary_ds_name, matchup_ds_names, parameter_s, \
- start_time, start_seconds_from_epoch, end_time, end_seconds_from_epoch, \
- depth_min, depth_max, time_tolerance, radius_tolerance, \
- platforms, match_once, result_size_limit
-
- def calc(self, request, **args):
- start = datetime.utcnow()
- # TODO Assuming Satellite primary
- bounding_polygon, primary_ds_name, matchup_ds_names, parameter_s, \
- start_time, start_seconds_from_epoch, end_time, end_seconds_from_epoch, \
- depth_min, depth_max, time_tolerance, radius_tolerance, \
- platforms, match_once, result_size_limit = self.parse_arguments(request)
-
- with ResultsStorage() as resultsStorage:
-
- execution_id = str(resultsStorage.insertExecution(None, start, None, None))
-
- self.log.debug("Querying for tiles in search domain")
- # Get tile ids in box
- tile_ids = [tile.tile_id for tile in
- self._get_tile_service().find_tiles_in_polygon(bounding_polygon, primary_ds_name,
- start_seconds_from_epoch, end_seconds_from_epoch,
- fetch_data=False, fl='id',
- sort=['tile_min_time_dt asc', 'tile_min_lon asc',
- 'tile_min_lat asc'], rows=5000)]
-
- # Call spark_matchup
- self.log.debug("Calling Spark Driver")
- try:
- spark_result = spark_matchup_driver(tile_ids, wkt.dumps(bounding_polygon), primary_ds_name,
- matchup_ds_names, parameter_s, depth_min, depth_max, time_tolerance,
- radius_tolerance, platforms, match_once, sc=self._sc)
- except Exception as e:
- self.log.exception(e)
- raise NexusProcessingException(reason="An unknown error occurred while computing matches", code=500)
-
- end = datetime.utcnow()
-
- self.log.debug("Building and saving results")
- args = {
- "primary": primary_ds_name,
- "matchup": matchup_ds_names,
- "startTime": start_time,
- "endTime": end_time,
- "bbox": request.get_argument('b'),
- "timeTolerance": time_tolerance,
- "radiusTolerance": float(radius_tolerance),
- "platforms": platforms,
- "parameter": parameter_s
- }
-
- if depth_min is not None:
- args["depthMin"] = float(depth_min)
-
- if depth_max is not None:
- args["depthMax"] = float(depth_max)
-
- total_keys = len(spark_result.keys())
- total_values = sum(len(v) for v in spark_result.itervalues())
- details = {
- "timeToComplete": int((end - start).total_seconds()),
- "numInSituRecords": 0,
- "numInSituMatched": total_values,
- "numGriddedChecked": 0,
- "numGriddedMatched": total_keys
- }
-
- matches = Matchup.convert_to_matches(spark_result)
-
- def do_result_insert():
- with ResultsStorage() as storage:
- storage.insertResults(results=matches, params=args, stats=details,
- startTime=start, completeTime=end, userEmail="",
- execution_id=execution_id)
-
- threading.Thread(target=do_result_insert).start()
-
- if 0 < result_size_limit < len(matches):
- result = DomsQueryResults(results=None, args=args, details=details, bounds=None, count=None,
- computeOptions=None, executionId=execution_id, status_code=202)
- else:
- result = DomsQueryResults(results=matches, args=args, details=details, bounds=None, count=None,
- computeOptions=None, executionId=execution_id)
-
- return result
-
- @classmethod
- def convert_to_matches(cls, spark_result):
- matches = []
- for primary_domspoint, matched_domspoints in spark_result.iteritems():
- p_matched = [cls.domspoint_to_dict(p_match) for p_match in matched_domspoints]
-
- primary = cls.domspoint_to_dict(primary_domspoint)
- primary['matches'] = list(p_matched)
- matches.append(primary)
- return matches
-
- @staticmethod
- def domspoint_to_dict(domspoint):
- return {
- "sea_water_temperature": domspoint.sst,
- "sea_water_temperature_depth": domspoint.sst_depth,
- "sea_water_salinity": domspoint.sss,
- "sea_water_salinity_depth": domspoint.sss_depth,
- "wind_speed": domspoint.wind_speed,
- "wind_direction": domspoint.wind_direction,
- "wind_u": domspoint.wind_u,
- "wind_v": domspoint.wind_v,
- "platform": doms_values.getPlatformById(domspoint.platform),
- "device": doms_values.getDeviceById(domspoint.device),
- "x": str(domspoint.longitude),
- "y": str(domspoint.latitude),
- "point": "Point(%s %s)" % (domspoint.longitude, domspoint.latitude),
- "time": datetime.strptime(domspoint.time, "%Y-%m-%dT%H:%M:%SZ").replace(tzinfo=UTC),
- "fileurl": domspoint.file_url,
- "id": domspoint.data_id,
- "source": domspoint.source,
- }
-
-
-class DomsPoint(object):
- def __init__(self, longitude=None, latitude=None, time=None, depth=None, data_id=None):
-
- self.time = time
- self.longitude = longitude
- self.latitude = latitude
- self.depth = depth
- self.data_id = data_id
-
- self.wind_u = None
- self.wind_v = None
- self.wind_direction = None
- self.wind_speed = None
- self.sst = None
- self.sst_depth = None
- self.sss = None
- self.sss_depth = None
- self.source = None
- self.depth = None
- self.platform = None
- self.device = None
- self.file_url = None
-
- def __repr__(self):
- return str(self.__dict__)
-
- @staticmethod
- def from_nexus_point(nexus_point, tile=None, parameter='sst'):
- point = DomsPoint()
-
- point.data_id = "%s[%s]" % (tile.tile_id, nexus_point.index)
-
- # TODO Not an ideal solution; but it works for now.
- if parameter == 'sst':
- point.sst = nexus_point.data_val.item()
- elif parameter == 'sss':
- point.sss = nexus_point.data_val.item()
- elif parameter == 'wind':
- point.wind_u = nexus_point.data_val.item()
- try:
- point.wind_v = tile.meta_data['wind_v'][tuple(nexus_point.index)].item()
- except (KeyError, IndexError):
- pass
- try:
- point.wind_direction = tile.meta_data['wind_dir'][tuple(nexus_point.index)].item()
- except (KeyError, IndexError):
- pass
- try:
- point.wind_speed = tile.meta_data['wind_speed'][tuple(nexus_point.index)].item()
- except (KeyError, IndexError):
- pass
- else:
- raise NotImplementedError('%s not supported. Only sst, sss, and wind parameters are supported.' % parameter)
-
- point.longitude = nexus_point.longitude.item()
- point.latitude = nexus_point.latitude.item()
-
- point.time = datetime.utcfromtimestamp(nexus_point.time).strftime('%Y-%m-%dT%H:%M:%SZ')
-
- try:
- point.depth = nexus_point.depth
- except KeyError:
- # No depth associated with this measurement
- pass
-
- point.sst_depth = 0
- point.source = tile.dataset
- point.file_url = tile.granule
-
- # TODO device should change based on the satellite making the observations.
- point.platform = 9
- point.device = 5
- return point
-
- @staticmethod
- def from_edge_point(edge_point):
- point = DomsPoint()
-
- try:
- x, y = wkt.loads(edge_point['point']).coords[0]
- except WKTReadingError:
- try:
- x, y = Point(*[float(c) for c in edge_point['point'].split(' ')]).coords[0]
- except ValueError:
- y, x = Point(*[float(c) for c in edge_point['point'].split(',')]).coords[0]
-
- point.longitude = x
- point.latitude = y
-
- point.time = edge_point['time']
-
- point.wind_u = edge_point.get('eastward_wind')
- point.wind_v = edge_point.get('northward_wind')
- point.wind_direction = edge_point.get('wind_direction')
- point.wind_speed = edge_point.get('wind_speed')
- point.sst = edge_point.get('sea_water_temperature')
- point.sst_depth = edge_point.get('sea_water_temperature_depth')
- point.sss = edge_point.get('sea_water_salinity')
- point.sss_depth = edge_point.get('sea_water_salinity_depth')
- point.source = edge_point.get('source')
- point.platform = edge_point.get('platform')
- point.device = edge_point.get('device')
- point.file_url = edge_point.get('fileurl')
-
- try:
- point.data_id = unicode(edge_point['id'])
- except KeyError:
- point.data_id = "%s:%s:%s" % (point.time, point.longitude, point.latitude)
-
- return point
-
-
-from threading import Lock
-
-DRIVER_LOCK = Lock()
-
-
-def spark_matchup_driver(tile_ids, bounding_wkt, primary_ds_name, matchup_ds_names, parameter, depth_min, depth_max,
- time_tolerance, radius_tolerance, platforms, match_once, sc=None):
- from functools import partial
-
- with DRIVER_LOCK:
- # Broadcast parameters
- primary_b = sc.broadcast(primary_ds_name)
- matchup_b = sc.broadcast(matchup_ds_names)
- depth_min_b = sc.broadcast(float(depth_min) if depth_min is not None else None)
- depth_max_b = sc.broadcast(float(depth_max) if depth_max is not None else None)
- tt_b = sc.broadcast(time_tolerance)
- rt_b = sc.broadcast(float(radius_tolerance))
- platforms_b = sc.broadcast(platforms)
- bounding_wkt_b = sc.broadcast(bounding_wkt)
- parameter_b = sc.broadcast(parameter)
-
- # Parallelize list of tile ids
- rdd = sc.parallelize(tile_ids, determine_parllelism(len(tile_ids)))
-
- # Map Partitions ( list(tile_id) )
- rdd_filtered = rdd.mapPartitions(
- partial(match_satellite_to_insitu, primary_b=primary_b, matchup_b=matchup_b, parameter_b=parameter_b, tt_b=tt_b,
- rt_b=rt_b, platforms_b=platforms_b, bounding_wkt_b=bounding_wkt_b, depth_min_b=depth_min_b,
- depth_max_b=depth_max_b), preservesPartitioning=True) \
- .filter(lambda p_m_tuple: abs(
- iso_time_to_epoch(p_m_tuple[0].time) - iso_time_to_epoch(p_m_tuple[1].time)) <= time_tolerance)
-
- if match_once:
- # Only the 'nearest' point for each primary should be returned. Add an extra map/reduce which calculates
- # the distance and finds the minimum
-
- # Method used for calculating the distance between 2 DomsPoints
- from pyproj import Geod
-
- def dist(primary, matchup):
- wgs84_geod = Geod(ellps='WGS84')
- lat1, lon1 = (primary.latitude, primary.longitude)
- lat2, lon2 = (matchup.latitude, matchup.longitude)
- az12, az21, distance = wgs84_geod.inv(lon1, lat1, lon2, lat2)
- return distance
-
- rdd_filtered = rdd_filtered \
- .map(lambda (primary, matchup): tuple([primary, tuple([matchup, dist(primary, matchup)])])) \
- .reduceByKey(lambda match_1, match_2: match_1 if match_1[1] < match_2[1] else match_2) \
- .mapValues(lambda x: [x[0]])
- else:
- rdd_filtered = rdd_filtered \
- .combineByKey(lambda value: [value], # Create 1 element list
- lambda value_list, value: value_list + [value], # Add 1 element to list
- lambda value_list_a, value_list_b: value_list_a + value_list_b) # Add two lists together
-
- result_as_map = rdd_filtered.collectAsMap()
-
- return result_as_map
-
-
-def determine_parllelism(num_tiles):
- """
- Try to stay at a maximum of 140 tiles per partition; But don't go over 128 partitions.
- Also, don't go below the default of 8
- """
- num_partitions = max(min(num_tiles / 140, 128), 8)
- return num_partitions
-
-
-def add_meters_to_lon_lat(lon, lat, meters):
- """
- Uses a simple approximation of
- 1 degree latitude = 111,111 meters
- 1 degree longitude = 111,111 meters * cosine(latitude)
- :param lon: longitude to add meters to
- :param lat: latitude to add meters to
- :param meters: meters to add to the longitude and latitude values
- :return: (longitude, latitude) increased by given meters
- """
- longitude = lon + ((meters / 111111) * cos(radians(lat)))
- latitude = lat + (meters / 111111)
-
- return longitude, latitude
-
-
-def match_satellite_to_insitu(tile_ids, primary_b, matchup_b, parameter_b, tt_b, rt_b, platforms_b,
- bounding_wkt_b, depth_min_b, depth_max_b):
- the_time = datetime.now()
- tile_ids = list(tile_ids)
- if len(tile_ids) == 0:
- return []
- tile_service = NexusTileService()
-
- # Determine the spatial temporal extents of this partition of tiles
- tiles_bbox = tile_service.get_bounding_box(tile_ids)
- tiles_min_time = tile_service.get_min_time(tile_ids)
- tiles_max_time = tile_service.get_max_time(tile_ids)
-
- # Increase spatial extents by the radius tolerance
- matchup_min_lon, matchup_min_lat = add_meters_to_lon_lat(tiles_bbox.bounds[0], tiles_bbox.bounds[1],
- -1 * rt_b.value)
- matchup_max_lon, matchup_max_lat = add_meters_to_lon_lat(tiles_bbox.bounds[2], tiles_bbox.bounds[3], rt_b.value)
-
- # Don't go outside of the search domain
- search_min_x, search_min_y, search_max_x, search_max_y = wkt.loads(bounding_wkt_b.value).bounds
- matchup_min_lon = max(matchup_min_lon, search_min_x)
- matchup_min_lat = max(matchup_min_lat, search_min_y)
- matchup_max_lon = min(matchup_max_lon, search_max_x)
- matchup_max_lat = min(matchup_max_lat, search_max_y)
-
- # Find the centroid of the matchup bounding box and initialize the projections
- matchup_center = box(matchup_min_lon, matchup_min_lat, matchup_max_lon, matchup_max_lat).centroid.coords[0]
- aeqd_proj = pyproj.Proj(proj='aeqd', lon_0=matchup_center[0], lat_0=matchup_center[1])
- lonlat_proj = pyproj.Proj(proj='lonlat')
-
- # Increase temporal extents by the time tolerance
- matchup_min_time = tiles_min_time - tt_b.value
- matchup_max_time = tiles_max_time + tt_b.value
- print "%s Time to determine spatial-temporal extents for partition %s to %s" % (
- str(datetime.now() - the_time), tile_ids[0], tile_ids[-1])
-
- # Query edge for all points within the spatial-temporal extents of this partition
- the_time = datetime.now()
- edge_session = requests.Session()
- edge_results = []
- with edge_session:
- for insitudata_name in matchup_b.value.split(','):
- bbox = ','.join(
- [str(matchup_min_lon), str(matchup_min_lat), str(matchup_max_lon), str(matchup_max_lat)])
- edge_response = query_edge(insitudata_name, parameter_b.value, matchup_min_time, matchup_max_time, bbox,
- platforms_b.value, depth_min_b.value, depth_max_b.value, session=edge_session)
- if edge_response['totalResults'] == 0:
- continue
- r = edge_response['results']
- for p in r:
- p['source'] = insitudata_name
- edge_results.extend(r)
- print "%s Time to call edge for partition %s to %s" % (str(datetime.now() - the_time), tile_ids[0], tile_ids[-1])
- if len(edge_results) == 0:
- return []
-
- # Convert edge points to utm
- the_time = datetime.now()
- matchup_points = np.ndarray((len(edge_results), 2), dtype=np.float32)
- for n, edge_point in enumerate(edge_results):
- try:
- x, y = wkt.loads(edge_point['point']).coords[0]
- except WKTReadingError:
- try:
- x, y = Point(*[float(c) for c in edge_point['point'].split(' ')]).coords[0]
- except ValueError:
- y, x = Point(*[float(c) for c in edge_point['point'].split(',')]).coords[0]
-
- matchup_points[n][0], matchup_points[n][1] = pyproj.transform(p1=lonlat_proj, p2=aeqd_proj, x=x, y=y)
- print "%s Time to convert match points for partition %s to %s" % (
- str(datetime.now() - the_time), tile_ids[0], tile_ids[-1])
-
- # Build kdtree from matchup points
- the_time = datetime.now()
- m_tree = spatial.cKDTree(matchup_points, leafsize=30)
- print "%s Time to build matchup tree" % (str(datetime.now() - the_time))
-
- # The actual matching happens in the generator. This is so that we only load 1 tile into memory at a time
- match_generators = [match_tile_to_point_generator(tile_service, tile_id, m_tree, edge_results, bounding_wkt_b.value,
- parameter_b.value, rt_b.value, lonlat_proj, aeqd_proj) for tile_id
- in tile_ids]
-
- return chain(*match_generators)
-
-
-def match_tile_to_point_generator(tile_service, tile_id, m_tree, edge_results, search_domain_bounding_wkt,
- search_parameter, radius_tolerance, lonlat_proj, aeqd_proj):
- from nexustiles.model.nexusmodel import NexusPoint
- from webservice.algorithms_spark.Matchup import DomsPoint # Must import DomsPoint or Spark complains
-
- # Load tile
- try:
- the_time = datetime.now()
- tile = tile_service.mask_tiles_to_polygon(wkt.loads(search_domain_bounding_wkt),
- tile_service.find_tile_by_id(tile_id))[0]
- print "%s Time to load tile %s" % (str(datetime.now() - the_time), tile_id)
- except IndexError:
- # This should only happen if all measurements in a tile become masked after applying the bounding polygon
- raise StopIteration
-
- # Convert valid tile lat,lon tuples to UTM tuples
- the_time = datetime.now()
- # Get list of indices of valid values
- valid_indices = tile.get_indices()
- primary_points = np.array(
- [pyproj.transform(p1=lonlat_proj, p2=aeqd_proj, x=tile.longitudes[aslice[2]], y=tile.latitudes[aslice[1]]) for
- aslice in valid_indices])
-
- print "%s Time to convert primary points for tile %s" % (str(datetime.now() - the_time), tile_id)
-
- a_time = datetime.now()
- p_tree = spatial.cKDTree(primary_points, leafsize=30)
- print "%s Time to build primary tree" % (str(datetime.now() - a_time))
-
- a_time = datetime.now()
- matched_indexes = p_tree.query_ball_tree(m_tree, radius_tolerance)
- print "%s Time to query primary tree for tile %s" % (str(datetime.now() - a_time), tile_id)
- for i, point_matches in enumerate(matched_indexes):
- if len(point_matches) > 0:
- p_nexus_point = NexusPoint(tile.latitudes[valid_indices[i][1]],
- tile.longitudes[valid_indices[i][2]], None,
- tile.times[valid_indices[i][0]], valid_indices[i],
- tile.data[tuple(valid_indices[i])])
- p_doms_point = DomsPoint.from_nexus_point(p_nexus_point, tile=tile, parameter=search_parameter)
- for m_point_index in point_matches:
- m_doms_point = DomsPoint.from_edge_point(edge_results[m_point_index])
- yield p_doms_point, m_doms_point
-
-
-def query_edge(dataset, variable, startTime, endTime, bbox, platform, depth_min, depth_max, itemsPerPage=1000,
- startIndex=0, stats=True, session=None):
- try:
- startTime = datetime.utcfromtimestamp(startTime).strftime('%Y-%m-%dT%H:%M:%SZ')
- except TypeError:
- # Assume we were passed a properly formatted string
- pass
-
- try:
- endTime = datetime.utcfromtimestamp(endTime).strftime('%Y-%m-%dT%H:%M:%SZ')
- except TypeError:
- # Assume we were passed a properly formatted string
- pass
-
- try:
- platform = platform.split(',')
- except AttributeError:
- # Assume we were passed a list
- pass
-
- params = {"variable": variable,
- "startTime": startTime,
- "endTime": endTime,
- "bbox": bbox,
- "minDepth": depth_min,
- "maxDepth": depth_max,
- "platform": platform,
- "itemsPerPage": itemsPerPage, "startIndex": startIndex, "stats": str(stats).lower()}
-
- if session is not None:
- edge_request = session.get(edge_endpoints.getEndpointByName(dataset)['url'], params=params)
- else:
- edge_request = requests.get(edge_endpoints.getEndpointByName(dataset)['url'], params=params)
-
- edge_request.raise_for_status()
- edge_response = json.loads(edge_request.text)
-
- # Get all edge results
- next_page_url = edge_response.get('next', None)
- while next_page_url is not None:
- if session is not None:
- edge_page_request = session.get(next_page_url)
- else:
- edge_page_request = requests.get(next_page_url)
-
- edge_page_request.raise_for_status()
- edge_page_response = json.loads(edge_page_request.text)
-
- edge_response['results'].extend(edge_page_response['results'])
-
- next_page_url = edge_page_response.get('next', None)
-
- return edge_response
diff --git a/analysis/webservice/algorithms_spark/NexusCalcSparkHandler.py b/analysis/webservice/algorithms_spark/NexusCalcSparkHandler.py
index 12b84c1..9e77887 100644
--- a/analysis/webservice/algorithms_spark/NexusCalcSparkHandler.py
+++ b/analysis/webservice/algorithms_spark/NexusCalcSparkHandler.py
@@ -6,6 +6,7 @@ from webservice.algorithms.NexusCalcHandler import NexusCalcHandler
from webservice.metrics import MetricsRecord, SparkAccumulatorMetricsField, NumberMetricsField
from webservice.webmodel import NexusProcessingException
+logger = logging.getLogger(__name__)
class NexusCalcSparkHandler(NexusCalcHandler):
class SparkJobContext(object):
diff --git a/analysis/webservice/algorithms_spark/__init__.py b/analysis/webservice/algorithms_spark/__init__.py
index d6ed83f..a25c8d5 100644
--- a/analysis/webservice/algorithms_spark/__init__.py
+++ b/analysis/webservice/algorithms_spark/__init__.py
@@ -20,7 +20,6 @@ import ClimMapSpark
import CorrMapSpark
import DailyDifferenceAverageSpark
import HofMoellerSpark
-import Matchup
import MaximaMinimaSpark
import NexusCalcSparkHandler
import TimeAvgMapSpark
@@ -47,11 +46,6 @@ if module_exists("pyspark"):
pass
try:
- import Matchup
- except ImportError:
- pass
-
- try:
import TimeAvgMapSpark
except ImportError:
pass
diff --git a/analysis/webservice/config/web.ini b/analysis/webservice/config/web.ini
index 2644ade..a1ecb2c 100644
--- a/analysis/webservice/config/web.ini
+++ b/analysis/webservice/config/web.ini
@@ -14,4 +14,4 @@ static_enabled=true
static_dir=static
[modules]
-module_dirs=webservice.algorithms,webservice.algorithms_spark,webservice.algorithms.doms
\ No newline at end of file
+module_dirs=webservice.algorithms,webservice.algorithms_spark
\ No newline at end of file
diff --git a/analysis/webservice/webapp.py b/analysis/webservice/webapp.py
index ad7a773..c8a6852 100644
--- a/analysis/webservice/webapp.py
+++ b/analysis/webservice/webapp.py
@@ -69,6 +69,7 @@ if __name__ == "__main__":
help='time out for solr requests in seconds, default (60) is ok for most deployments'
' when solr performances are not good this might need to be increased')
define('solr_host', help='solr host and port')
+ define('cassandra_host', help='cassandra host')
parse_command_line()
algorithm_config = inject_args_in_config(options, algorithm_config)
@@ -114,7 +115,7 @@ if __name__ == "__main__":
else:
handlers.append(
(clazzWrapper.path, NexusRequestHandler,
- dict(clazz=clazzWrapper, thread_pool=request_thread_pool)))
+ dict(clazz=clazzWrapper, algorithm_config=algorithm_config, thread_pool=request_thread_pool)))
class VersionHandler(tornado.web.RequestHandler):
diff --git a/data-access/nexustiles/config/datastores.ini.default b/data-access/nexustiles/config/datastores.ini.default
index 0fe8d9d..e4ab6c1 100644
--- a/data-access/nexustiles/config/datastores.ini.default
+++ b/data-access/nexustiles/config/datastores.ini.default
@@ -1,5 +1,5 @@
[cassandra]
-host=sdap-cassandra
+host=localhost
port=9042
keyspace=nexustiles
local_datacenter=datacenter1
@@ -15,7 +15,7 @@ table=nexus-jpl-table
region=us-west-2
[solr]
-host=sdap-solr:8983
+host=localhost:8983
core=nexustiles
[datastore]
diff --git a/data-access/nexustiles/nexustiles.py b/data-access/nexustiles/nexustiles.py
index 24db1ae..3e7e2f8 100644
--- a/data-access/nexustiles/nexustiles.py
+++ b/data-access/nexustiles/nexustiles.py
@@ -102,10 +102,10 @@ class NexusTileService(object):
def override_config(self, config):
for section in config.sections():
- if self._config.has_section(section): # only override preexisting section, ignores the other
+ if self._config.has_section(section): # only override preexisting section, ignores the other
for option in config.options(section):
- self._config.set(section, option, config.get(section, option))
-
+ if config.get(section, option) is not None:
+ self._config.set(section, option, config.get(section, option))
def get_dataseries_list(self, simple=False):
if simple:
diff --git a/helm/templates/webapp.yml b/helm/templates/webapp.yml
index e363ab7..4af0e38 100644
--- a/helm/templates/webapp.yml
+++ b/helm/templates/webapp.yml
@@ -9,7 +9,7 @@ spec:
pythonVersion: "2"
mode: cluster
image: {{ .Values.webapp.distributed.image }}
- imagePullPolicy: Always
+ imagePullPolicy: IfNotPresent
mainApplicationFile: local:///incubator-sdap-nexus/analysis/webservice/webapp.py
arguments:
- "--solr-host={{ .Release.Name }}-solr-svc:8983"
diff --git a/tools/doms/README.md b/tools/doms/README.md
deleted file mode 100644
index c49fa4a..0000000
--- a/tools/doms/README.md
+++ /dev/null
@@ -1,66 +0,0 @@
-# doms_reader.py
-The functions in doms_reader.py read a DOMS netCDF file into memory, assemble a list of matches of satellite and in situ data, and optionally output the matches to a CSV file. Each matched pair contains one satellite data record and one in situ data record.
-
-The DOMS netCDF files hold satellite data and in situ data in different groups (`SatelliteData` and `InsituData`). The `matchIDs` netCDF variable contains pairs of IDs (matches) which reference a satellite data record and an in situ data record in their respective groups. These records have a many-to-many relationship; one satellite record may match to many in situ records, and one in situ record may match to many satellite records. The `assemble_matches` function assembles the individua [...]
-
-## Requirements
-This tool was developed and tested with Python 2.7.5 and 3.7.0a0.
-Imported packages:
-* argparse
-* netcdf4
-* sys
-* datetime
-* csv
-* collections
-* logging
-
-
-## Functions
-### Function: `assemble_matches(filename)`
-Read a DOMS netCDF file into memory and return a list of matches from the file.
-
-#### Parameters
-- `filename` (str): the DOMS netCDF file name.
-
-#### Returns
-- `matches` (list): List of matches.
-
-Each list element in `matches` is a dictionary organized as follows:
- For match `m`, netCDF group `GROUP` ('SatelliteData' or 'InsituData'), and netCDF group variable `VARIABLE`:
-
-`matches[m][GROUP]['matchID']`: netCDF `MatchedRecords` dimension ID for the match
-`matches[m][GROUP]['GROUPID']`: GROUP netCDF `dim` dimension ID for the record
-`matches[m][GROUP][VARIABLE]`: variable value
-
-For example, to access the timestamps of the satellite data and the in situ data of the first match in the list, along with the `MatchedRecords` dimension ID and the groups' `dim` dimension ID:
-```python
-matches[0]['SatelliteData']['time']
-matches[0]['InsituData']['time']
-matches[0]['SatelliteData']['matchID']
-matches[0]['SatelliteData']['SatelliteDataID']
-matches[0]['InsituData']['InsituDataID']
-```
-
-
-### Function: `matches_to_csv(matches, csvfile)`
-Write the DOMS matches to a CSV file. Include a header of column names which are based on the group and variable names from the netCDF file.
-
-#### Parameters:
-- `matches` (list): the list of dictionaries containing the DOMS matches as returned from the `assemble_matches` function.
-- `csvfile` (str): the name of the CSV output file.
-
-## Usage
-For example, to read some DOMS netCDF file called `doms_file.nc`:
-### Command line
-The main function for `doms_reader.py` takes one `filename` parameter (`doms_file.nc` argument in this example) for the DOMS netCDF file to read, calls the `assemble_matches` function, then calls the `matches_to_csv` function to write the matches to a CSV file `doms_matches.csv`.
-```
-python doms_reader.py doms_file.nc
-```
-```
-python3 doms_reader.py doms_file.nc
-```
-### Importing `assemble_matches`
-```python
-from doms_reader import assemble_matches
-matches = assemble_matches('doms_file.nc')
-```
diff --git a/tools/doms/doms_reader.py b/tools/doms/doms_reader.py
deleted file mode 100644
index c8229c4..0000000
--- a/tools/doms/doms_reader.py
+++ /dev/null
@@ -1,144 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import argparse
-from netCDF4 import Dataset, num2date
-import sys
-import datetime
-import csv
-from collections import OrderedDict
-import logging
-
-LOGGER = logging.getLogger("doms_reader")
-
-def assemble_matches(filename):
- """
- Read a DOMS netCDF file and return a list of matches.
-
- Parameters
- ----------
- filename : str
- The DOMS netCDF file name.
-
- Returns
- -------
- matches : list
- List of matches. Each list element is a dictionary.
- For match m, netCDF group GROUP (SatelliteData or InsituData), and
- group variable VARIABLE:
- matches[m][GROUP]['matchID']: MatchedRecords dimension ID for the match
- matches[m][GROUP]['GROUPID']: GROUP dim dimension ID for the record
- matches[m][GROUP][VARIABLE]: variable value
- """
-
- try:
- # Open the netCDF file
- with Dataset(filename, 'r') as doms_nc:
- # Check that the number of groups is consistent w/ the MatchedGroups
- # dimension
- assert len(doms_nc.groups) == doms_nc.dimensions['MatchedGroups'].size,\
- ("Number of groups isn't the same as MatchedGroups dimension.")
-
- matches = []
- matched_records = doms_nc.dimensions['MatchedRecords'].size
-
- # Loop through the match IDs to assemble matches
- for match in range(0, matched_records):
- match_dict = OrderedDict()
- # Grab the data from each platform (group) in the match
- for group_num, group in enumerate(doms_nc.groups):
- match_dict[group] = OrderedDict()
- match_dict[group]['matchID'] = match
- ID = doms_nc.variables['matchIDs'][match][group_num]
- match_dict[group][group + 'ID'] = ID
- for var in doms_nc.groups[group].variables.keys():
- match_dict[group][var] = doms_nc.groups[group][var][ID]
-
- # Create a UTC datetime field from timestamp
- dt = num2date(match_dict[group]['time'],
- doms_nc.groups[group]['time'].units)
- match_dict[group]['datetime'] = dt
- LOGGER.info(match_dict)
- matches.append(match_dict)
-
- return matches
- except (OSError, IOError) as err:
- LOGGER.exception("Error reading netCDF file " + filename)
- raise err
-
-def matches_to_csv(matches, csvfile):
- """
- Write the DOMS matches to a CSV file. Include a header of column names
- which are based on the group and variable names from the netCDF file.
-
- Parameters
- ----------
- matches : list
- The list of dictionaries containing the DOMS matches as returned from
- assemble_matches.
- csvfile : str
- The name of the CSV output file.
- """
- # Create a header for the CSV. Column names are GROUP_VARIABLE or
- # GROUP_GROUPID.
- header = []
- for key, value in matches[0].items():
- for otherkey in value.keys():
- header.append(key + "_" + otherkey)
-
- try:
- # Write the CSV file
- with open(csvfile, 'w') as output_file:
- csv_writer = csv.writer(output_file)
- csv_writer.writerow(header)
- for match in matches:
- row = []
- for group, data in match.items():
- for value in data.values():
- row.append(value)
- csv_writer.writerow(row)
- except (OSError, IOError) as err:
- LOGGER.exception("Error writing CSV file " + csvfile)
- raise err
-
-if __name__ == '__main__':
- """
- Execution:
- python doms_reader.py filename
- OR
- python3 doms_reader.py filename
- """
- logging.basicConfig(format='%(asctime)s %(levelname)-8s %(message)s',
- level=logging.INFO,
- datefmt='%Y-%m-%d %H:%M:%S')
-
- p = argparse.ArgumentParser()
- p.add_argument('filename', help='DOMS netCDF file to read')
- args = p.parse_args()
-
- doms_matches = assemble_matches(args.filename)
-
- matches_to_csv(doms_matches, 'doms_matches.csv')
-
-
-
-
-
-
-
-
-
-
-
\ No newline at end of file