You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by jj...@apache.org on 2018/09/17 22:52:56 UTC

[incubator-sdap-nexus] branch master updated: SDAP-50 Fix incorrect Hovmoller Implementation (#28)

This is an automated email from the ASF dual-hosted git repository.

jjacob pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git


The following commit(s) were added to refs/heads/master by this push:
     new 1997f20  SDAP-50 Fix incorrect Hovmoller Implementation (#28)
1997f20 is described below

commit 1997f20a615b6475a8b882dc591780a3f4b720a9
Author: Joseph Jacob <jj...@users.noreply.github.com>
AuthorDate: Mon Sep 17 15:52:52 2018 -0700

    SDAP-50 Fix incorrect Hovmoller Implementation (#28)
    
    This update combines Hovmoller calculation across NEXUS tiles, combines duplicate time stamps, and adds cos(lat) weighting for latitude-averaging.
---
 .../webservice/algorithms_spark/HofMoellerSpark.py | 248 +++++++++++++--------
 1 file changed, 160 insertions(+), 88 deletions(-)

diff --git a/analysis/webservice/algorithms_spark/HofMoellerSpark.py b/analysis/webservice/algorithms_spark/HofMoellerSpark.py
index 63fbaa4..92d7a70 100644
--- a/analysis/webservice/algorithms_spark/HofMoellerSpark.py
+++ b/analysis/webservice/algorithms_spark/HofMoellerSpark.py
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-
+import sys
 import itertools
 import logging
 import traceback
@@ -35,82 +35,64 @@ LATITUDE = 0
 LONGITUDE = 1
 
 
-class LongitudeHofMoellerCalculator(object):
-    @staticmethod
-    def longitude_time_hofmoeller_stats(tile_in_spark):
-
-        (tile_id, index, min_lat, max_lat, min_lon, max_lon) = tile_in_spark
-
-        tile_service = NexusTileService()
-        try:
-            # Load the dataset tile
-            tile = tile_service.find_tile_by_id(tile_id)[0]
-            # Mask it to the search domain
-            tile = tile_service.mask_tiles_to_bbox(min_lat, max_lat, min_lon, max_lon, [tile])[0]
-        except IndexError:
-            return None
-
-        stat = {
-            'sequence': index,
-            'time': np.ma.min(tile.times),
-            'lons': []
-        }
-        points = list(tile.nexus_point_generator())
-        data = sorted(points, key=lambda p: p.longitude)
-        points_by_lon = itertools.groupby(data, key=lambda p: p.longitude)
-
-        for lon, points_at_lon in points_by_lon:
-            values_at_lon = np.array([point.data_val for point in points_at_lon])
-            stat['lons'].append({
-                'longitude': float(lon),
-                'cnt': len(values_at_lon),
-                'avg': np.mean(values_at_lon).item(),
-                'max': np.max(values_at_lon).item(),
-                'min': np.min(values_at_lon).item(),
-                'std': np.std(values_at_lon).item()
-            })
-
-        return stat
-
-
-class LatitudeHofMoellerCalculator(object):
+class HofMoellerCalculator(object):
     @staticmethod
-    def latitude_time_hofmoeller_stats(tile_in_spark):
+    def hofmoeller_stats(tile_in_spark):
 
-        (tile_id, index, min_lat, max_lat, min_lon, max_lon) = tile_in_spark
+        (latlon, tile_id, index,
+         min_lat, max_lat, min_lon, max_lon) = tile_in_spark
 
         tile_service = NexusTileService()
         try:
             # Load the dataset tile
             tile = tile_service.find_tile_by_id(tile_id)[0]
             # Mask it to the search domain
-            tile = tile_service.mask_tiles_to_bbox(min_lat, max_lat, min_lon, max_lon, [tile])[0]
+            tile = tile_service.mask_tiles_to_bbox(min_lat, max_lat,
+                                                   min_lon, max_lon, [tile])[0]
         except IndexError:
-            return None
+            #return None
+            return []
 
-        stat = {
-            'sequence': index,
-            'time': np.ma.min(tile.times),
-            'lats': []
-        }
+        t = np.ma.min(tile.times)
+        stats = []
 
         points = list(tile.nexus_point_generator())
-        data = sorted(points, key=lambda p: p.latitude)
-        points_by_lat = itertools.groupby(data, key=lambda p: p.latitude)
-
-        for lat, points_at_lat in points_by_lat:
-            values_at_lat = np.array([point.data_val for point in points_at_lat])
-
-            stat['lats'].append({
-                'latitude': float(lat),
-                'cnt': len(values_at_lat),
-                'avg': np.mean(values_at_lat).item(),
-                'max': np.max(values_at_lat).item(),
-                'min': np.min(values_at_lat).item(),
-                'std': np.std(values_at_lat).item()
-            })
+        if latlon == 0:
+            # Latitude-Time Map (Average over longitudes)
+            data = sorted(points, key=lambda p: p.latitude)
+            points_by_coord = itertools.groupby(data, key=lambda p: p.latitude)
+        else:
+            # Longitude-Time Map (Average over latitudes)
+            data = sorted(points, key=lambda p: p.longitude)
+            points_by_coord = itertools.groupby(data, key=lambda p: p.longitude)
+            
+ 
+        for coord, points_at_coord in points_by_coord:
+            values_at_coord = np.array([[p.data_val,
+                                        np.cos(np.radians(p.latitude))]
+                                        for p in points_at_coord])
+            vals = np.nan_to_num(values_at_coord[:,0])
+            weights = values_at_coord[:,1]
+            coord_cnt = len(values_at_coord)
+            if latlon == 0:
+                # Latitude-Time Map (Average over longitudes)
+                # In this case there is no weighting by cos(lat)
+                weighted_sum = np.sum(vals).item()
+                sum_of_weights = coord_cnt
+            else:
+                # Longitude-Time Map (Average over latitudes)
+                # In this case we need to weight by cos(lat)
+                weighted_sum = np.dot(vals, weights)
+                sum_of_weights = np.sum(weights).item()
 
-        return stat
+            stats.append(((t, float(coord)), (t, index, float(coord),
+                                              coord_cnt,
+                                              weighted_sum,
+                                              sum_of_weights,
+                                              np.max(vals).item(),
+                                              np.min(vals).item(),
+                                              np.var(vals).item())))
+        return stats
 
 
 class BaseHoffMoellerHandlerImpl(SparkHandler):
@@ -150,6 +132,95 @@ def determine_parllelism(num_tiles):
     return num_partitions
 
 
+def parallel_variance(avg_a, count_a, var_a, avg_b, count_b, var_b):
+    # Thanks Wikipedia https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm
+    delta = avg_b - avg_a
+    m_a = var_a * (count_a - 1)
+    m_b = var_b * (count_b - 1)
+    M2 = m_a + m_b + delta ** 2 * count_a * count_b / (count_a + count_b)
+    return M2 / (count_a + count_b - 1)
+
+
+def hof_tuple_time(t):
+    return t[0]
+
+
+def hof_tuple_combine(t1, t2):
+    return (t1[0], # Time
+            t1[1], # Sequence (index)
+            t1[2], # Coordinate on axis (latitude or longitude)
+            t1[3] + t2[3], # Number of values
+            t1[4] + t2[4], # Sum of values (weighted for lon-time maps)
+            t1[5] + t2[5], # Sum of weights (= # of values for lat-time maps)
+            max(t1[6], t2[6]), # Maximum value
+            min(t1[7], t2[7]), # Minimum value
+            parallel_variance(t1[4]/t1[5], t1[3], t1[8], 
+                              t2[4]/t2[5], t2[3], t2[8])) # Variance
+
+def hof_tuple_to_dict(t, avg_var_name):
+    return {avg_var_name: t[2],
+            'cnt': t[3],
+            'avg': t[4] / t[5],
+            'std': np.sqrt(t[8]),
+            'max': t[6],
+            'min': t[7]}
+
+def spark_driver(sc, latlon, nexus_tiles_spark):
+    # Parallelize list of tile ids
+    rdd = sc.parallelize(nexus_tiles_spark, determine_parllelism(len(nexus_tiles_spark)))
+    if latlon == 0:
+        # Latitude-Time Map (Average over longitudes)
+        avg_var_name = 'latitude'
+        avg_var_name_collection = 'lats'
+    else:
+        # Longitude-Time Map (Average over latitudes)
+        avg_var_name = 'longitude'
+        avg_var_name_collection = 'lons'
+        
+    # Create a set of key-value pairs where the key is (time, lat|lon) and
+    # the value is a tuple of intermediate statistics for the specified
+    # coordinate within a single NEXUS tile.
+    results = rdd.flatMap(HofMoellerCalculator.hofmoeller_stats)
+
+    # Combine tuples across tiles with input key = (time, lat|lon)
+    # Output a key value pair with key = (time)
+    results = results.combineByKey(lambda val: (hof_tuple_time(val),val),
+                                   lambda x, val: (hof_tuple_time(x),
+                                                   hof_tuple_combine(x[1],
+                                                                     val)),
+                                   lambda x, y: (hof_tuple_time(x),
+                                                 hof_tuple_combine(x[1],
+                                                                   y[1])))
+
+    # Convert the tuples to dictionary entries and combine coordinates
+    # with the same time stamp.  Here we have input key = (time)
+    results = results.values().\
+              combineByKey(lambda val, avg_var_name=avg_var_name,
+                           avg_var_name_collection=avg_var_name_collection: {
+                               'sequence': val[1],
+                               'time': val[0],
+                               avg_var_name_collection: [
+                                   hof_tuple_to_dict(val, avg_var_name)]},
+                           lambda x, val, avg_var_name=avg_var_name,
+                           avg_var_name_collection=avg_var_name_collection: {
+                               'sequence': x['sequence'],
+                               'time': x['time'],
+                               avg_var_name_collection: (
+                                   x[avg_var_name_collection] +
+                                   [hof_tuple_to_dict(val, avg_var_name)])},
+                           lambda x, y,
+                             avg_var_name_collection=avg_var_name_collection:
+                             {'sequence': x['sequence'],
+                              'time': x['time'],
+                              avg_var_name_collection: (
+                                  x[avg_var_name_collection] +
+                                  y[avg_var_name_collection])}).\
+              values().\
+              collect()
+
+    return results
+
+
 @nexus_handler
 class LatitudeTimeHoffMoellerSparkHandlerImpl(BaseHoffMoellerHandlerImpl):
     name = "Latitude/Time HofMoeller Spark"
@@ -159,27 +230,27 @@ class LatitudeTimeHoffMoellerSparkHandlerImpl(BaseHoffMoellerHandlerImpl):
     singleton = True
 
     def __init__(self):
+        self._latlon = 0 # 0 for latitude-time map, 1 for longitude-time map
         BaseHoffMoellerHandlerImpl.__init__(self)
 
     def calc(self, computeOptions, **args):
-        nexus_tiles_spark = [(tile.tile_id, x, computeOptions.get_min_lat(), computeOptions.get_max_lat(),
-                              computeOptions.get_min_lon(), computeOptions.get_max_lon()) for x, tile in enumerate(
-            self._tile_service.find_tiles_in_box(computeOptions.get_min_lat(), computeOptions.get_max_lat(),
-                                                 computeOptions.get_min_lon(), computeOptions.get_max_lon(),
-                                                 computeOptions.get_dataset()[0],
-                                                 computeOptions.get_start_time(),
-                                                 computeOptions.get_end_time(), fetch_data=False))]
-
+        nexus_tiles_spark = [(self._latlon, tile.tile_id, x,
+                              computeOptions.get_min_lat(),
+                              computeOptions.get_max_lat(),
+                              computeOptions.get_min_lon(),
+                              computeOptions.get_max_lon())
+                             for x, tile in enumerate(self._tile_service.find_tiles_in_box(computeOptions.get_min_lat(), computeOptions.get_max_lat(), computeOptions.get_min_lon(), computeOptions.get_max_lon(), computeOptions.get_dataset()[0], computeOptions.get_start_time(), computeOptions.get_end_time(), fetch_data=False))]
+        print ("Got {} tiles".format(len(nexus_tiles_spark)))
         if len(nexus_tiles_spark) == 0:
             raise NexusProcessingException.NoDataException(reason="No data found for selected timeframe")
 
-        # Parallelize list of tile ids
-        rdd = self._sc.parallelize(nexus_tiles_spark, determine_parllelism(len(nexus_tiles_spark)))
-        results = rdd.map(LatitudeHofMoellerCalculator.latitude_time_hofmoeller_stats).collect()
-
+        results = spark_driver (self._sc, self._latlon, nexus_tiles_spark)
+                                                        
         results = filter(None, results)
-        results = sorted(results, key=lambda entry: entry["time"])
-
+        results = sorted(results, key=lambda entry: entry['time'])
+        for i in range(len(results)):
+            results[i]['lats'] = sorted(results[i]['lats'],
+                                        key=lambda entry: entry['latitude'])
         results = self.applyDeseasonToHofMoeller(results)
 
         result = HoffMoellerResults(results=results, computeOptions=computeOptions, type=HoffMoellerResults.LATITUDE)
@@ -195,26 +266,27 @@ class LongitudeTimeHoffMoellerSparkHandlerImpl(BaseHoffMoellerHandlerImpl):
     singleton = True
 
     def __init__(self):
+        self._latlon = 1 # 0 for latitude-time map; 1 for longitude-time map
         BaseHoffMoellerHandlerImpl.__init__(self)
 
     def calc(self, computeOptions, **args):
-        nexus_tiles_spark = [(tile.tile_id, x, computeOptions.get_min_lat(), computeOptions.get_max_lat(),
-                              computeOptions.get_min_lon(), computeOptions.get_max_lon()) for x, tile in enumerate(
-            self._tile_service.find_tiles_in_box(computeOptions.get_min_lat(), computeOptions.get_max_lat(),
-                                                 computeOptions.get_min_lon(), computeOptions.get_max_lon(),
-                                                 computeOptions.get_dataset()[0],
-                                                 computeOptions.get_start_time(),
-                                                 computeOptions.get_end_time(), fetch_data=False))]
+        nexus_tiles_spark = [(self._latlon, tile.tile_id, x,
+                              computeOptions.get_min_lat(),
+                              computeOptions.get_max_lat(),
+                              computeOptions.get_min_lon(),
+                              computeOptions.get_max_lon())
+                             for x, tile in enumerate(self._tile_service.find_tiles_in_box(computeOptions.get_min_lat(), computeOptions.get_max_lat(), computeOptions.get_min_lon(), computeOptions.get_max_lon(), computeOptions.get_dataset()[0], computeOptions.get_start_time(), computeOptions.get_end_time(), fetch_data=False))]
 
         if len(nexus_tiles_spark) == 0:
             raise NexusProcessingException.NoDataException(reason="No data found for selected timeframe")
 
-        # Parallelize list of tile ids
-        rdd = self._sc.parallelize(nexus_tiles_spark, determine_parllelism(len(nexus_tiles_spark)))
-        results = rdd.map(LongitudeHofMoellerCalculator.longitude_time_hofmoeller_stats).collect()
+        results = spark_driver (self._sc, self._latlon, nexus_tiles_spark)
 
         results = filter(None, results)
         results = sorted(results, key=lambda entry: entry["time"])
+        for i in range(len(results)):
+            results[i]['lons'] = sorted(results[i]['lons'],
+                                        key=lambda entry: entry['longitude'])
 
         results = self.applyDeseasonToHofMoeller(results, pivot="lons")