You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by jj...@apache.org on 2018/09/17 22:52:56 UTC
[incubator-sdap-nexus] branch master updated: SDAP-50 Fix incorrect
Hovmoller Implementation (#28)
This is an automated email from the ASF dual-hosted git repository.
jjacob pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git
The following commit(s) were added to refs/heads/master by this push:
new 1997f20 SDAP-50 Fix incorrect Hovmoller Implementation (#28)
1997f20 is described below
commit 1997f20a615b6475a8b882dc591780a3f4b720a9
Author: Joseph Jacob <jj...@users.noreply.github.com>
AuthorDate: Mon Sep 17 15:52:52 2018 -0700
SDAP-50 Fix incorrect Hovmoller Implementation (#28)
This update combines Hovmoller calculation across NEXUS tiles, combines duplicate time stamps, and adds cos(lat) weighting for latitude-averaging.
---
.../webservice/algorithms_spark/HofMoellerSpark.py | 248 +++++++++++++--------
1 file changed, 160 insertions(+), 88 deletions(-)
diff --git a/analysis/webservice/algorithms_spark/HofMoellerSpark.py b/analysis/webservice/algorithms_spark/HofMoellerSpark.py
index 63fbaa4..92d7a70 100644
--- a/analysis/webservice/algorithms_spark/HofMoellerSpark.py
+++ b/analysis/webservice/algorithms_spark/HofMoellerSpark.py
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-
+import sys
import itertools
import logging
import traceback
@@ -35,82 +35,64 @@ LATITUDE = 0
LONGITUDE = 1
-class LongitudeHofMoellerCalculator(object):
- @staticmethod
- def longitude_time_hofmoeller_stats(tile_in_spark):
-
- (tile_id, index, min_lat, max_lat, min_lon, max_lon) = tile_in_spark
-
- tile_service = NexusTileService()
- try:
- # Load the dataset tile
- tile = tile_service.find_tile_by_id(tile_id)[0]
- # Mask it to the search domain
- tile = tile_service.mask_tiles_to_bbox(min_lat, max_lat, min_lon, max_lon, [tile])[0]
- except IndexError:
- return None
-
- stat = {
- 'sequence': index,
- 'time': np.ma.min(tile.times),
- 'lons': []
- }
- points = list(tile.nexus_point_generator())
- data = sorted(points, key=lambda p: p.longitude)
- points_by_lon = itertools.groupby(data, key=lambda p: p.longitude)
-
- for lon, points_at_lon in points_by_lon:
- values_at_lon = np.array([point.data_val for point in points_at_lon])
- stat['lons'].append({
- 'longitude': float(lon),
- 'cnt': len(values_at_lon),
- 'avg': np.mean(values_at_lon).item(),
- 'max': np.max(values_at_lon).item(),
- 'min': np.min(values_at_lon).item(),
- 'std': np.std(values_at_lon).item()
- })
-
- return stat
-
-
-class LatitudeHofMoellerCalculator(object):
+class HofMoellerCalculator(object):
@staticmethod
- def latitude_time_hofmoeller_stats(tile_in_spark):
+ def hofmoeller_stats(tile_in_spark):
- (tile_id, index, min_lat, max_lat, min_lon, max_lon) = tile_in_spark
+ (latlon, tile_id, index,
+ min_lat, max_lat, min_lon, max_lon) = tile_in_spark
tile_service = NexusTileService()
try:
# Load the dataset tile
tile = tile_service.find_tile_by_id(tile_id)[0]
# Mask it to the search domain
- tile = tile_service.mask_tiles_to_bbox(min_lat, max_lat, min_lon, max_lon, [tile])[0]
+ tile = tile_service.mask_tiles_to_bbox(min_lat, max_lat,
+ min_lon, max_lon, [tile])[0]
except IndexError:
- return None
+ #return None
+ return []
- stat = {
- 'sequence': index,
- 'time': np.ma.min(tile.times),
- 'lats': []
- }
+ t = np.ma.min(tile.times)
+ stats = []
points = list(tile.nexus_point_generator())
- data = sorted(points, key=lambda p: p.latitude)
- points_by_lat = itertools.groupby(data, key=lambda p: p.latitude)
-
- for lat, points_at_lat in points_by_lat:
- values_at_lat = np.array([point.data_val for point in points_at_lat])
-
- stat['lats'].append({
- 'latitude': float(lat),
- 'cnt': len(values_at_lat),
- 'avg': np.mean(values_at_lat).item(),
- 'max': np.max(values_at_lat).item(),
- 'min': np.min(values_at_lat).item(),
- 'std': np.std(values_at_lat).item()
- })
+ if latlon == 0:
+ # Latitude-Time Map (Average over longitudes)
+ data = sorted(points, key=lambda p: p.latitude)
+ points_by_coord = itertools.groupby(data, key=lambda p: p.latitude)
+ else:
+ # Longitude-Time Map (Average over latitudes)
+ data = sorted(points, key=lambda p: p.longitude)
+ points_by_coord = itertools.groupby(data, key=lambda p: p.longitude)
+
+
+ for coord, points_at_coord in points_by_coord:
+ values_at_coord = np.array([[p.data_val,
+ np.cos(np.radians(p.latitude))]
+ for p in points_at_coord])
+ vals = np.nan_to_num(values_at_coord[:,0])
+ weights = values_at_coord[:,1]
+ coord_cnt = len(values_at_coord)
+ if latlon == 0:
+ # Latitude-Time Map (Average over longitudes)
+ # In this case there is no weighting by cos(lat)
+ weighted_sum = np.sum(vals).item()
+ sum_of_weights = coord_cnt
+ else:
+ # Longitude-Time Map (Average over latitudes)
+ # In this case we need to weight by cos(lat)
+ weighted_sum = np.dot(vals, weights)
+ sum_of_weights = np.sum(weights).item()
- return stat
+ stats.append(((t, float(coord)), (t, index, float(coord),
+ coord_cnt,
+ weighted_sum,
+ sum_of_weights,
+ np.max(vals).item(),
+ np.min(vals).item(),
+ np.var(vals).item())))
+ return stats
class BaseHoffMoellerHandlerImpl(SparkHandler):
@@ -150,6 +132,95 @@ def determine_parllelism(num_tiles):
return num_partitions
+def parallel_variance(avg_a, count_a, var_a, avg_b, count_b, var_b):
+ # Thanks Wikipedia https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm
+ delta = avg_b - avg_a
+ m_a = var_a * (count_a - 1)
+ m_b = var_b * (count_b - 1)
+ M2 = m_a + m_b + delta ** 2 * count_a * count_b / (count_a + count_b)
+ return M2 / (count_a + count_b - 1)
+
+
+def hof_tuple_time(t):
+ return t[0]
+
+
+def hof_tuple_combine(t1, t2):
+ return (t1[0], # Time
+ t1[1], # Sequence (index)
+ t1[2], # Coordinate on axis (latitude or longitude)
+ t1[3] + t2[3], # Number of values
+ t1[4] + t2[4], # Sum of values (weighted for lon-time maps)
+ t1[5] + t2[5], # Sum of weights (= # of values for lat-time maps)
+ max(t1[6], t2[6]), # Maximum value
+ min(t1[7], t2[7]), # Minimum value
+ parallel_variance(t1[4]/t1[5], t1[3], t1[8],
+ t2[4]/t2[5], t2[3], t2[8])) # Variance
+
+def hof_tuple_to_dict(t, avg_var_name):
+ return {avg_var_name: t[2],
+ 'cnt': t[3],
+ 'avg': t[4] / t[5],
+ 'std': np.sqrt(t[8]),
+ 'max': t[6],
+ 'min': t[7]}
+
+def spark_driver(sc, latlon, nexus_tiles_spark):
+ # Parallelize list of tile ids
+ rdd = sc.parallelize(nexus_tiles_spark, determine_parllelism(len(nexus_tiles_spark)))
+ if latlon == 0:
+ # Latitude-Time Map (Average over longitudes)
+ avg_var_name = 'latitude'
+ avg_var_name_collection = 'lats'
+ else:
+ # Longitude-Time Map (Average over latitudes)
+ avg_var_name = 'longitude'
+ avg_var_name_collection = 'lons'
+
+ # Create a set of key-value pairs where the key is (time, lat|lon) and
+ # the value is a tuple of intermediate statistics for the specified
+ # coordinate within a single NEXUS tile.
+ results = rdd.flatMap(HofMoellerCalculator.hofmoeller_stats)
+
+ # Combine tuples across tiles with input key = (time, lat|lon)
+ # Output a key value pair with key = (time)
+ results = results.combineByKey(lambda val: (hof_tuple_time(val),val),
+ lambda x, val: (hof_tuple_time(x),
+ hof_tuple_combine(x[1],
+ val)),
+ lambda x, y: (hof_tuple_time(x),
+ hof_tuple_combine(x[1],
+ y[1])))
+
+ # Convert the tuples to dictionary entries and combine coordinates
+ # with the same time stamp. Here we have input key = (time)
+ results = results.values().\
+ combineByKey(lambda val, avg_var_name=avg_var_name,
+ avg_var_name_collection=avg_var_name_collection: {
+ 'sequence': val[1],
+ 'time': val[0],
+ avg_var_name_collection: [
+ hof_tuple_to_dict(val, avg_var_name)]},
+ lambda x, val, avg_var_name=avg_var_name,
+ avg_var_name_collection=avg_var_name_collection: {
+ 'sequence': x['sequence'],
+ 'time': x['time'],
+ avg_var_name_collection: (
+ x[avg_var_name_collection] +
+ [hof_tuple_to_dict(val, avg_var_name)])},
+ lambda x, y,
+ avg_var_name_collection=avg_var_name_collection:
+ {'sequence': x['sequence'],
+ 'time': x['time'],
+ avg_var_name_collection: (
+ x[avg_var_name_collection] +
+ y[avg_var_name_collection])}).\
+ values().\
+ collect()
+
+ return results
+
+
@nexus_handler
class LatitudeTimeHoffMoellerSparkHandlerImpl(BaseHoffMoellerHandlerImpl):
name = "Latitude/Time HofMoeller Spark"
@@ -159,27 +230,27 @@ class LatitudeTimeHoffMoellerSparkHandlerImpl(BaseHoffMoellerHandlerImpl):
singleton = True
def __init__(self):
+ self._latlon = 0 # 0 for latitude-time map, 1 for longitude-time map
BaseHoffMoellerHandlerImpl.__init__(self)
def calc(self, computeOptions, **args):
- nexus_tiles_spark = [(tile.tile_id, x, computeOptions.get_min_lat(), computeOptions.get_max_lat(),
- computeOptions.get_min_lon(), computeOptions.get_max_lon()) for x, tile in enumerate(
- self._tile_service.find_tiles_in_box(computeOptions.get_min_lat(), computeOptions.get_max_lat(),
- computeOptions.get_min_lon(), computeOptions.get_max_lon(),
- computeOptions.get_dataset()[0],
- computeOptions.get_start_time(),
- computeOptions.get_end_time(), fetch_data=False))]
-
+ nexus_tiles_spark = [(self._latlon, tile.tile_id, x,
+ computeOptions.get_min_lat(),
+ computeOptions.get_max_lat(),
+ computeOptions.get_min_lon(),
+ computeOptions.get_max_lon())
+ for x, tile in enumerate(self._tile_service.find_tiles_in_box(computeOptions.get_min_lat(), computeOptions.get_max_lat(), computeOptions.get_min_lon(), computeOptions.get_max_lon(), computeOptions.get_dataset()[0], computeOptions.get_start_time(), computeOptions.get_end_time(), fetch_data=False))]
+ print ("Got {} tiles".format(len(nexus_tiles_spark)))
if len(nexus_tiles_spark) == 0:
raise NexusProcessingException.NoDataException(reason="No data found for selected timeframe")
- # Parallelize list of tile ids
- rdd = self._sc.parallelize(nexus_tiles_spark, determine_parllelism(len(nexus_tiles_spark)))
- results = rdd.map(LatitudeHofMoellerCalculator.latitude_time_hofmoeller_stats).collect()
-
+ results = spark_driver (self._sc, self._latlon, nexus_tiles_spark)
+
results = filter(None, results)
- results = sorted(results, key=lambda entry: entry["time"])
-
+ results = sorted(results, key=lambda entry: entry['time'])
+ for i in range(len(results)):
+ results[i]['lats'] = sorted(results[i]['lats'],
+ key=lambda entry: entry['latitude'])
results = self.applyDeseasonToHofMoeller(results)
result = HoffMoellerResults(results=results, computeOptions=computeOptions, type=HoffMoellerResults.LATITUDE)
@@ -195,26 +266,27 @@ class LongitudeTimeHoffMoellerSparkHandlerImpl(BaseHoffMoellerHandlerImpl):
singleton = True
def __init__(self):
+ self._latlon = 1 # 0 for latitude-time map; 1 for longitude-time map
BaseHoffMoellerHandlerImpl.__init__(self)
def calc(self, computeOptions, **args):
- nexus_tiles_spark = [(tile.tile_id, x, computeOptions.get_min_lat(), computeOptions.get_max_lat(),
- computeOptions.get_min_lon(), computeOptions.get_max_lon()) for x, tile in enumerate(
- self._tile_service.find_tiles_in_box(computeOptions.get_min_lat(), computeOptions.get_max_lat(),
- computeOptions.get_min_lon(), computeOptions.get_max_lon(),
- computeOptions.get_dataset()[0],
- computeOptions.get_start_time(),
- computeOptions.get_end_time(), fetch_data=False))]
+ nexus_tiles_spark = [(self._latlon, tile.tile_id, x,
+ computeOptions.get_min_lat(),
+ computeOptions.get_max_lat(),
+ computeOptions.get_min_lon(),
+ computeOptions.get_max_lon())
+ for x, tile in enumerate(self._tile_service.find_tiles_in_box(computeOptions.get_min_lat(), computeOptions.get_max_lat(), computeOptions.get_min_lon(), computeOptions.get_max_lon(), computeOptions.get_dataset()[0], computeOptions.get_start_time(), computeOptions.get_end_time(), fetch_data=False))]
if len(nexus_tiles_spark) == 0:
raise NexusProcessingException.NoDataException(reason="No data found for selected timeframe")
- # Parallelize list of tile ids
- rdd = self._sc.parallelize(nexus_tiles_spark, determine_parllelism(len(nexus_tiles_spark)))
- results = rdd.map(LongitudeHofMoellerCalculator.longitude_time_hofmoeller_stats).collect()
+ results = spark_driver (self._sc, self._latlon, nexus_tiles_spark)
results = filter(None, results)
results = sorted(results, key=lambda entry: entry["time"])
+ for i in range(len(results)):
+ results[i]['lons'] = sorted(results[i]['lons'],
+ key=lambda entry: entry['longitude'])
results = self.applyDeseasonToHofMoeller(results, pivot="lons")