You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@sdap.apache.org by GitBox <gi...@apache.org> on 2018/09/17 22:52:54 UTC

[GitHub] jjacob7734 closed pull request #28: SDAP-50 Fix incorrect Hovmoller Implementation

jjacob7734 closed pull request #28: SDAP-50 Fix incorrect Hovmoller Implementation
URL: https://github.com/apache/incubator-sdap-nexus/pull/28
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/analysis/webservice/algorithms_spark/HofMoellerSpark.py b/analysis/webservice/algorithms_spark/HofMoellerSpark.py
index 63fbaa4..92d7a70 100644
--- a/analysis/webservice/algorithms_spark/HofMoellerSpark.py
+++ b/analysis/webservice/algorithms_spark/HofMoellerSpark.py
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-
+import sys
 import itertools
 import logging
 import traceback
@@ -35,82 +35,64 @@
 LONGITUDE = 1
 
 
-class LongitudeHofMoellerCalculator(object):
-    @staticmethod
-    def longitude_time_hofmoeller_stats(tile_in_spark):
-
-        (tile_id, index, min_lat, max_lat, min_lon, max_lon) = tile_in_spark
-
-        tile_service = NexusTileService()
-        try:
-            # Load the dataset tile
-            tile = tile_service.find_tile_by_id(tile_id)[0]
-            # Mask it to the search domain
-            tile = tile_service.mask_tiles_to_bbox(min_lat, max_lat, min_lon, max_lon, [tile])[0]
-        except IndexError:
-            return None
-
-        stat = {
-            'sequence': index,
-            'time': np.ma.min(tile.times),
-            'lons': []
-        }
-        points = list(tile.nexus_point_generator())
-        data = sorted(points, key=lambda p: p.longitude)
-        points_by_lon = itertools.groupby(data, key=lambda p: p.longitude)
-
-        for lon, points_at_lon in points_by_lon:
-            values_at_lon = np.array([point.data_val for point in points_at_lon])
-            stat['lons'].append({
-                'longitude': float(lon),
-                'cnt': len(values_at_lon),
-                'avg': np.mean(values_at_lon).item(),
-                'max': np.max(values_at_lon).item(),
-                'min': np.min(values_at_lon).item(),
-                'std': np.std(values_at_lon).item()
-            })
-
-        return stat
-
-
-class LatitudeHofMoellerCalculator(object):
+class HofMoellerCalculator(object):
     @staticmethod
-    def latitude_time_hofmoeller_stats(tile_in_spark):
+    def hofmoeller_stats(tile_in_spark):
 
-        (tile_id, index, min_lat, max_lat, min_lon, max_lon) = tile_in_spark
+        (latlon, tile_id, index,
+         min_lat, max_lat, min_lon, max_lon) = tile_in_spark
 
         tile_service = NexusTileService()
         try:
             # Load the dataset tile
             tile = tile_service.find_tile_by_id(tile_id)[0]
             # Mask it to the search domain
-            tile = tile_service.mask_tiles_to_bbox(min_lat, max_lat, min_lon, max_lon, [tile])[0]
+            tile = tile_service.mask_tiles_to_bbox(min_lat, max_lat,
+                                                   min_lon, max_lon, [tile])[0]
         except IndexError:
-            return None
+            #return None
+            return []
 
-        stat = {
-            'sequence': index,
-            'time': np.ma.min(tile.times),
-            'lats': []
-        }
+        t = np.ma.min(tile.times)
+        stats = []
 
         points = list(tile.nexus_point_generator())
-        data = sorted(points, key=lambda p: p.latitude)
-        points_by_lat = itertools.groupby(data, key=lambda p: p.latitude)
-
-        for lat, points_at_lat in points_by_lat:
-            values_at_lat = np.array([point.data_val for point in points_at_lat])
-
-            stat['lats'].append({
-                'latitude': float(lat),
-                'cnt': len(values_at_lat),
-                'avg': np.mean(values_at_lat).item(),
-                'max': np.max(values_at_lat).item(),
-                'min': np.min(values_at_lat).item(),
-                'std': np.std(values_at_lat).item()
-            })
+        if latlon == 0:
+            # Latitude-Time Map (Average over longitudes)
+            data = sorted(points, key=lambda p: p.latitude)
+            points_by_coord = itertools.groupby(data, key=lambda p: p.latitude)
+        else:
+            # Longitude-Time Map (Average over latitudes)
+            data = sorted(points, key=lambda p: p.longitude)
+            points_by_coord = itertools.groupby(data, key=lambda p: p.longitude)
+            
+ 
+        for coord, points_at_coord in points_by_coord:
+            values_at_coord = np.array([[p.data_val,
+                                        np.cos(np.radians(p.latitude))]
+                                        for p in points_at_coord])
+            vals = np.nan_to_num(values_at_coord[:,0])
+            weights = values_at_coord[:,1]
+            coord_cnt = len(values_at_coord)
+            if latlon == 0:
+                # Latitude-Time Map (Average over longitudes)
+                # In this case there is no weighting by cos(lat)
+                weighted_sum = np.sum(vals).item()
+                sum_of_weights = coord_cnt
+            else:
+                # Longitude-Time Map (Average over latitudes)
+                # In this case we need to weight by cos(lat)
+                weighted_sum = np.dot(vals, weights)
+                sum_of_weights = np.sum(weights).item()
 
-        return stat
+            stats.append(((t, float(coord)), (t, index, float(coord),
+                                              coord_cnt,
+                                              weighted_sum,
+                                              sum_of_weights,
+                                              np.max(vals).item(),
+                                              np.min(vals).item(),
+                                              np.var(vals).item())))
+        return stats
 
 
 class BaseHoffMoellerHandlerImpl(SparkHandler):
@@ -150,6 +132,95 @@ def determine_parllelism(num_tiles):
     return num_partitions
 
 
+def parallel_variance(avg_a, count_a, var_a, avg_b, count_b, var_b):
+    # Thanks Wikipedia https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm
+    delta = avg_b - avg_a
+    m_a = var_a * (count_a - 1)
+    m_b = var_b * (count_b - 1)
+    M2 = m_a + m_b + delta ** 2 * count_a * count_b / (count_a + count_b)
+    return M2 / (count_a + count_b - 1)
+
+
+def hof_tuple_time(t):
+    return t[0]
+
+
+def hof_tuple_combine(t1, t2):
+    return (t1[0], # Time
+            t1[1], # Sequence (index)
+            t1[2], # Coordinate on axis (latitude or longitude)
+            t1[3] + t2[3], # Number of values
+            t1[4] + t2[4], # Sum of values (weighted for lon-time maps)
+            t1[5] + t2[5], # Sum of weights (= # of values for lat-time maps)
+            max(t1[6], t2[6]), # Maximum value
+            min(t1[7], t2[7]), # Minimum value
+            parallel_variance(t1[4]/t1[5], t1[3], t1[8], 
+                              t2[4]/t2[5], t2[3], t2[8])) # Variance
+
+def hof_tuple_to_dict(t, avg_var_name):
+    return {avg_var_name: t[2],
+            'cnt': t[3],
+            'avg': t[4] / t[5],
+            'std': np.sqrt(t[8]),
+            'max': t[6],
+            'min': t[7]}
+
+def spark_driver(sc, latlon, nexus_tiles_spark):
+    # Parallelize list of tile ids
+    rdd = sc.parallelize(nexus_tiles_spark, determine_parllelism(len(nexus_tiles_spark)))
+    if latlon == 0:
+        # Latitude-Time Map (Average over longitudes)
+        avg_var_name = 'latitude'
+        avg_var_name_collection = 'lats'
+    else:
+        # Longitude-Time Map (Average over latitudes)
+        avg_var_name = 'longitude'
+        avg_var_name_collection = 'lons'
+        
+    # Create a set of key-value pairs where the key is (time, lat|lon) and
+    # the value is a tuple of intermediate statistics for the specified
+    # coordinate within a single NEXUS tile.
+    results = rdd.flatMap(HofMoellerCalculator.hofmoeller_stats)
+
+    # Combine tuples across tiles with input key = (time, lat|lon)
+    # Output a key value pair with key = (time)
+    results = results.combineByKey(lambda val: (hof_tuple_time(val),val),
+                                   lambda x, val: (hof_tuple_time(x),
+                                                   hof_tuple_combine(x[1],
+                                                                     val)),
+                                   lambda x, y: (hof_tuple_time(x),
+                                                 hof_tuple_combine(x[1],
+                                                                   y[1])))
+
+    # Convert the tuples to dictionary entries and combine coordinates
+    # with the same time stamp.  Here we have input key = (time)
+    results = results.values().\
+              combineByKey(lambda val, avg_var_name=avg_var_name,
+                           avg_var_name_collection=avg_var_name_collection: {
+                               'sequence': val[1],
+                               'time': val[0],
+                               avg_var_name_collection: [
+                                   hof_tuple_to_dict(val, avg_var_name)]},
+                           lambda x, val, avg_var_name=avg_var_name,
+                           avg_var_name_collection=avg_var_name_collection: {
+                               'sequence': x['sequence'],
+                               'time': x['time'],
+                               avg_var_name_collection: (
+                                   x[avg_var_name_collection] +
+                                   [hof_tuple_to_dict(val, avg_var_name)])},
+                           lambda x, y,
+                             avg_var_name_collection=avg_var_name_collection:
+                             {'sequence': x['sequence'],
+                              'time': x['time'],
+                              avg_var_name_collection: (
+                                  x[avg_var_name_collection] +
+                                  y[avg_var_name_collection])}).\
+              values().\
+              collect()
+
+    return results
+
+
 @nexus_handler
 class LatitudeTimeHoffMoellerSparkHandlerImpl(BaseHoffMoellerHandlerImpl):
     name = "Latitude/Time HofMoeller Spark"
@@ -159,27 +230,27 @@ class LatitudeTimeHoffMoellerSparkHandlerImpl(BaseHoffMoellerHandlerImpl):
     singleton = True
 
     def __init__(self):
+        self._latlon = 0 # 0 for latitude-time map, 1 for longitude-time map
         BaseHoffMoellerHandlerImpl.__init__(self)
 
     def calc(self, computeOptions, **args):
-        nexus_tiles_spark = [(tile.tile_id, x, computeOptions.get_min_lat(), computeOptions.get_max_lat(),
-                              computeOptions.get_min_lon(), computeOptions.get_max_lon()) for x, tile in enumerate(
-            self._tile_service.find_tiles_in_box(computeOptions.get_min_lat(), computeOptions.get_max_lat(),
-                                                 computeOptions.get_min_lon(), computeOptions.get_max_lon(),
-                                                 computeOptions.get_dataset()[0],
-                                                 computeOptions.get_start_time(),
-                                                 computeOptions.get_end_time(), fetch_data=False))]
-
+        nexus_tiles_spark = [(self._latlon, tile.tile_id, x,
+                              computeOptions.get_min_lat(),
+                              computeOptions.get_max_lat(),
+                              computeOptions.get_min_lon(),
+                              computeOptions.get_max_lon())
+                             for x, tile in enumerate(self._tile_service.find_tiles_in_box(computeOptions.get_min_lat(), computeOptions.get_max_lat(), computeOptions.get_min_lon(), computeOptions.get_max_lon(), computeOptions.get_dataset()[0], computeOptions.get_start_time(), computeOptions.get_end_time(), fetch_data=False))]
+        print ("Got {} tiles".format(len(nexus_tiles_spark)))
         if len(nexus_tiles_spark) == 0:
             raise NexusProcessingException.NoDataException(reason="No data found for selected timeframe")
 
-        # Parallelize list of tile ids
-        rdd = self._sc.parallelize(nexus_tiles_spark, determine_parllelism(len(nexus_tiles_spark)))
-        results = rdd.map(LatitudeHofMoellerCalculator.latitude_time_hofmoeller_stats).collect()
-
+        results = spark_driver (self._sc, self._latlon, nexus_tiles_spark)
+                                                        
         results = filter(None, results)
-        results = sorted(results, key=lambda entry: entry["time"])
-
+        results = sorted(results, key=lambda entry: entry['time'])
+        for i in range(len(results)):
+            results[i]['lats'] = sorted(results[i]['lats'],
+                                        key=lambda entry: entry['latitude'])
         results = self.applyDeseasonToHofMoeller(results)
 
         result = HoffMoellerResults(results=results, computeOptions=computeOptions, type=HoffMoellerResults.LATITUDE)
@@ -195,26 +266,27 @@ class LongitudeTimeHoffMoellerSparkHandlerImpl(BaseHoffMoellerHandlerImpl):
     singleton = True
 
     def __init__(self):
+        self._latlon = 1 # 0 for latitude-time map; 1 for longitude-time map
         BaseHoffMoellerHandlerImpl.__init__(self)
 
     def calc(self, computeOptions, **args):
-        nexus_tiles_spark = [(tile.tile_id, x, computeOptions.get_min_lat(), computeOptions.get_max_lat(),
-                              computeOptions.get_min_lon(), computeOptions.get_max_lon()) for x, tile in enumerate(
-            self._tile_service.find_tiles_in_box(computeOptions.get_min_lat(), computeOptions.get_max_lat(),
-                                                 computeOptions.get_min_lon(), computeOptions.get_max_lon(),
-                                                 computeOptions.get_dataset()[0],
-                                                 computeOptions.get_start_time(),
-                                                 computeOptions.get_end_time(), fetch_data=False))]
+        nexus_tiles_spark = [(self._latlon, tile.tile_id, x,
+                              computeOptions.get_min_lat(),
+                              computeOptions.get_max_lat(),
+                              computeOptions.get_min_lon(),
+                              computeOptions.get_max_lon())
+                             for x, tile in enumerate(self._tile_service.find_tiles_in_box(computeOptions.get_min_lat(), computeOptions.get_max_lat(), computeOptions.get_min_lon(), computeOptions.get_max_lon(), computeOptions.get_dataset()[0], computeOptions.get_start_time(), computeOptions.get_end_time(), fetch_data=False))]
 
         if len(nexus_tiles_spark) == 0:
             raise NexusProcessingException.NoDataException(reason="No data found for selected timeframe")
 
-        # Parallelize list of tile ids
-        rdd = self._sc.parallelize(nexus_tiles_spark, determine_parllelism(len(nexus_tiles_spark)))
-        results = rdd.map(LongitudeHofMoellerCalculator.longitude_time_hofmoeller_stats).collect()
+        results = spark_driver (self._sc, self._latlon, nexus_tiles_spark)
 
         results = filter(None, results)
         results = sorted(results, key=lambda entry: entry["time"])
+        for i in range(len(results)):
+            results[i]['lons'] = sorted(results[i]['lons'],
+                                        key=lambda entry: entry['longitude'])
 
         results = self.applyDeseasonToHofMoeller(results, pivot="lons")
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services