You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by nc...@apache.org on 2021/12/09 00:49:46 UTC

[incubator-sdap-nexus] branch master updated: SDAP-361 Time series spark algorithm unit test (#148)

This is an automated email from the ASF dual-hosted git repository.

nchung pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e6176c  SDAP-361 Time series spark algorithm unit test (#148)
3e6176c is described below

commit 3e6176cedfae466e529d9c74804a07430cb73c32
Author: Kevin <ke...@gmail.com>
AuthorDate: Wed Dec 8 16:49:42 2021 -0800

    SDAP-361 Time series spark algorithm unit test (#148)
    
    * first pass at time series unittest
    
    * Created timeseriesspark unit test for the calc function
    
    * first pass at time series unittest
    
    * Created timeseriesspark unit test for the calc function
    
    * Added mock patch to avoid saving mean time series netCDF file
    
    * Added tests for calc_average_on_day, calc with seasonal filter for monthly data and daily data, and calc with seasonal and lowpass filters
    
    * commented out lowpass filter assertions for the time being
    
    * Fixed bug in test_calc_average_on_day
---
 .../tests/algorithms_spark/test_timeseriesspark.py | 630 +++++++++++++++++++++
 1 file changed, 630 insertions(+)

diff --git a/analysis/tests/algorithms_spark/test_timeseriesspark.py b/analysis/tests/algorithms_spark/test_timeseriesspark.py
new file mode 100644
index 0000000..bbb1a32
--- /dev/null
+++ b/analysis/tests/algorithms_spark/test_timeseriesspark.py
@@ -0,0 +1,630 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import json
+from datetime import datetime
+import mock
+import numpy as np
+from pytz import timezone as tz
+import itertools
+from random import uniform
+
+import pytest
+import webservice.algorithms_spark.TimeSeriesSpark as timeseries
+from nexustiles.model.nexusmodel import Tile
+from pyspark.sql import SparkSession
+from shapely import wkt
+from shapely.geometry import box
+from webservice.algorithms_spark.TimeSeriesSpark import TimeSeriesSparkHandlerImpl
+
+class MockSparkParam:
+    def __init__(self, value):
+        self.value = value
+
+@pytest.fixture(scope='function')
+def timeseries_args():
+    tile_ids = [1]
+    polygon_wkt = 'POLYGON((-34.98 29.54, -30.1 29.54, -30.1 31.00, -34.98 31.00, -34.98 29.54))'
+    ds_name = 'test-dataset'
+    start = '2020-01-15T00:00:00Z'
+    end = '2020-01-15T00:00:00Z'
+    seasonal = False
+    low_pass = False
+
+
+    yield dict(
+        tile_ids=tile_ids,
+        ds=MockSparkParam(ds_name),
+        start=MockSparkParam(start),
+        end=MockSparkParam(end),
+        seasonal=MockSparkParam(seasonal),
+        low_pass=MockSparkParam(low_pass),
+        bounding_wkt=MockSparkParam(polygon_wkt),
+    )
+
+
+def generate_monthly_stats():
+    """
+    Generates dummy monthly stats for one year
+    """
+    times = [1577836800, 1580515200, 1583020800, 1585699200, 1588291200, 1590969600,
+    1593561600, 1596240000, 1598918400, 1601510400, 1604188800, 1606780800]
+
+    stats_arr = [[]]
+    for time in times:
+        vals = [uniform(0,100) for i in range(843)]
+        stats_arr[0].append({
+            'min': min(vals),
+            'max': max(vals),
+            'mean': np.mean(vals),
+            'cnt': 843,
+            'std': np.std(vals),
+            'time': int(time),
+            'iso_time': datetime.utcfromtimestamp(int(time)).replace(tzinfo=tz('UTC')).strftime('%Y-%m-%dT%H:%M:%S%z')
+        })
+
+    clim_stats = {datetime.utcfromtimestamp(result['time']).month: result for result in stats_arr[0]}
+
+    return stats_arr, clim_stats
+
+def generate_daily_stats():
+    """
+    Generates dummy daily stats for one year
+    """
+    times = [1577836800 + (i * 86400) for i in range(0,366)]
+    
+    stats_arr = [[]]
+    for time in times:
+        vals = [uniform(0,100) for i in range(843)]
+        stats_arr[0].append({
+            'min': np.min(vals),
+            'max': np.max(vals),
+            'mean': np.mean(vals),
+            'cnt': 843,
+            'std': np.std(vals),
+            'time': int(time),
+            'iso_time': datetime.utcfromtimestamp(int(time)).replace(tzinfo=tz('UTC')).strftime('%Y-%m-%dT%H:%M:%S%z')
+        })
+
+    clim_stats = {datetime.utcfromtimestamp(result['time']).month: result for result in stats_arr[0]}
+
+    return stats_arr, clim_stats
+
+
+def test_calc_average_on_day():
+    """
+    Tests the calc_average_on_day function in TimeSeriesSpark.py
+    """
+    timestamps = [1578096000, 1578096001, 1578096002]
+    data = [1,2,3]
+
+    test_tile = Tile(
+        tile_id='test-tile',
+        bbox='',
+        dataset='test-dataset',
+        dataset_id='test-dataset',
+        granule='test-granule',
+        min_time='2020-07-28T00:00:00',
+        max_time='2020-07-28T00:00:00',
+        section_spec='2020-07-28T00:00:00',
+        meta_data={},
+        is_multi=True
+    )
+
+    test_tile.data = np.ma.MaskedArray(data, [0, 0, 0])
+    test_tile.times = timestamps
+    test_tile.latitudes = [45]
+    test_tile.longitudes = [60, 60, 60]
+
+    def setup_mock_tile_service(tile):
+        tile_service_factory = mock.MagicMock()
+        tile_service = mock.MagicMock()
+        tile_service_factory.return_value = tile_service
+        tile_service.get_bounding_box.return_value = box(-90, -45, 90, 45)
+        tile_service.get_min_time.return_value = 1627490285
+        tile_service.get_max_time.return_value = 1627490285
+        tile_service.get_tiles_bounded_by_polygon.return_value = [test_tile]
+        tile_service.mask_tiles_to_polygon.return_value = [test_tile]
+        return tile_service_factory
+
+    def callback(calculation):
+        """
+        Dummy function used for metrics callback
+        """
+        pass
+
+    # Spark tile format: (polygon string, ds name, list of time stamps, fill value)
+    spark_tile = ('POLYGON((-34.98 29.54, -30.1 29.54, -30.1 31.00, -34.98 31.00, -34.98 29.54))', 
+                    'dataset', timestamps, -9999.)
+
+    avg_args = dict(
+        tile_service_factory = setup_mock_tile_service(test_tile),
+        metrics_callback = callback,
+        normalize_dates = False,
+        tile_in_spark = spark_tile
+    )
+    stats = timeseries.calc_average_on_day(**avg_args)
+
+    assert stats[0][0]['min'] == np.min(data)
+    assert stats[0][0]['max'] == np.max(data)
+    assert stats[0][0]['mean'] == np.mean(data)
+    assert stats[0][0]['cnt'] == len(data)
+    assert stats[0][0]['std'] == np.std(data)
+    assert stats[0][0]['time'] == timestamps[0]
+    assert stats[0][0]['iso_time'] == datetime.utcfromtimestamp(int(timestamps[0])).replace(tzinfo=tz('UTC')).strftime('%Y-%m-%dT%H:%M:%S%z')
+    
+
+def test_calc(timeseries_args):
+    """
+    Assert that the expected functions are called during the timeseries
+    calculation and that the results are formatted as expected.
+    """
+    # Mock anything that connects external dependence (Solr, Cassandra, ...)
+    tile_service_factory = mock.MagicMock()
+    tile_service = mock.MagicMock()
+    tile_service_factory.return_value = tile_service
+    spark = SparkSession.builder.appName('nexus-analysis').getOrCreate()
+    spark_context = spark.sparkContext
+    request = mock.MagicMock()
+    request.get_argument.return_value = '1,2,3,4,5,6,7,8'
+
+    # Generate the dummy data
+    stats_arr, clim_stats = generate_monthly_stats()
+
+    # Dummy bounding polygon
+    polygon_wkt = 'POLYGON((-34.98 29.54, -30.1 29.54, -30.1 31.00, -34.98 31.00, -34.98 29.54))'
+
+    # Patch in request arguments
+    args = {
+        'ds': ['test-dataset'],
+        'bounding_polygon': wkt.loads(polygon_wkt),
+        'start_seconds_from_epoch': stats_arr[0][0]['time'],
+        'end_seconds_from_epoch': stats_arr[0][-1]['time'],
+        'apply_seasonal_cycle_filter': False,
+        'apply_low_pass_filter': False,
+        'nparts_requested': 1,
+        'normalize_dates': False,
+    }
+
+    def generate_fake_tile(tile_id):
+        tile = Tile()
+        tile.tile_id = tile_id
+        tile.min_time = 1627490285,
+        return tile
+
+    # Mock tiles
+    fake_tiles = [generate_fake_tile(idx) for idx in range(10)]
+    tile_service.find_days_in_range_asc.return_value = [f.min_time[0]+1 for f in fake_tiles]
+
+    # Mock result
+    # 'spark_driver' returns results, meta. 
+    # Format of results: [[{stats}, {stats},...]]
+    # Format of meta: {}
+    fake_spark_result = list(itertools.chain.from_iterable(stats_arr))
+    fake_spark_result = sorted(fake_spark_result, key=lambda entry: entry["time"]), {}
+
+    timeseries_obj = TimeSeriesSparkHandlerImpl(tile_service_factory=tile_service_factory, sc=spark_context)
+    timeseries_obj.parse_arguments = lambda _: [item for item in args.values()]
+
+    # Mock the spark driver with our fake spark results
+    # Also mock _create_nc_file_time1d to avoid writing the .nc file 
+    with mock.patch('webservice.algorithms_spark.TimeSeriesSpark.spark_driver') as mock_driver, \
+        mock.patch('webservice.algorithms_spark.NexusCalcSparkHandler.NexusCalcSparkHandler._create_nc_file_time1d') as mock_create_nc:
+        mock_driver.return_value = fake_spark_result
+        timeseries_result = timeseries_obj.calc(request)
+        # Ensure the call to 'spark_driver' contains the expected params
+        assert len(mock_driver.call_args_list) == 1
+
+        # Check args
+        mock_driver_args = mock_driver.call_args_list[0].args
+        mock_driver_kwargs = mock_driver.call_args_list[0].kwargs    
+        assert mock_driver_args[0] == [f.min_time[0]+1 for f in fake_tiles]
+        assert mock_driver_args[1] == wkt.loads(polygon_wkt)
+        assert mock_driver_args[2] == args['ds'][0]
+        assert mock_driver_args[3] == tile_service_factory
+        assert mock_driver_args[5] == args['normalize_dates']
+        assert mock_driver_kwargs['sc'] == spark_context
+
+        # Ensure the result of the timeseries is as expected
+        json_timeseries_result = json.loads(timeseries_result.toJson())
+
+        # Meta
+        assert len(json_timeseries_result['meta']) == 1
+        assert json_timeseries_result['meta'][0]['shortName'] == args['ds'][0]
+        assert json_timeseries_result['meta'][0]['bounds']['north'] == 31
+        assert json_timeseries_result['meta'][0]['bounds']['east'] == -30.1
+        assert json_timeseries_result['meta'][0]['bounds']['south'] == 29.54
+        assert json_timeseries_result['meta'][0]['bounds']['west'] == -34.98
+        assert json_timeseries_result['meta'][0]['time']['start'] == stats_arr[0][0]['time']
+        assert json_timeseries_result['meta'][0]['time']['stop'] == stats_arr[0][-1]['time']
+
+        # Data
+        for result_data, stats_data in zip(json_timeseries_result['data'][0], stats_arr[0]):
+            assert result_data['min'] == stats_data['min']
+            assert result_data['max'] == stats_data['max']
+            assert result_data['mean'] == stats_data['mean']
+            assert result_data['cnt'] == stats_data['cnt']
+            assert result_data['std'] == stats_data['std']
+            assert result_data['time'] == stats_data['time']
+            assert result_data['iso_time'] == stats_data['iso_time']
+
+        # Stats
+        assert json_timeseries_result['stats'] == {}
+
+
+"""
+Seasonal filter = true works as follows:
+- Performs normal calculations on data
+- Performs calculations on clim data
+    - Indexes results by month
+    - If there aren't results for each month it raises exception
+- Calculates and stores min, max, mean seasonal values
+    - The way we mock the services means the clim data is the same as the
+        first day of the non-clim data.
+"""
+def test_calc_seasonal_filter(timeseries_args):
+    """
+    Assert that the expected functions are called during the timeseries
+    calculation and that the results are formatted as expected.
+    """
+    # Mock anything that connects external dependence (Solr, Cassandra, ...)
+    tile_service_factory = mock.MagicMock()
+    tile_service = mock.MagicMock()
+    tile_service_factory.return_value = tile_service
+    spark = SparkSession.builder.appName('nexus-analysis').getOrCreate()
+    spark_context = spark.sparkContext
+    request = mock.MagicMock()
+    request.get_argument.return_value = '1,2,3,4,5,6,7,8'
+
+    # Generate the dummy data
+    stats_arr, clim_stats = generate_monthly_stats()
+
+    # Dummy bounding polygon
+    polygon_wkt = 'POLYGON((-34.98 29.54, -30.1 29.54, -30.1 31.00, -34.98 31.00, -34.98 29.54))'
+    
+    # Patch in request arguments
+    args = {
+        'ds': ['test-dataset'],
+        'bounding_polygon': wkt.loads(polygon_wkt),
+        'start_seconds_from_epoch': stats_arr[0][0]['time'],
+        'end_seconds_from_epoch': stats_arr[0][-1]['time'],
+        'apply_seasonal_cycle_filter': True,
+        'apply_low_pass_filter': False,
+        'nparts_requested': 1,
+        'normalize_dates': False,
+    }    
+
+    def generate_fake_tile(tile_id):
+        tile = Tile()
+        tile.tile_id = tile_id
+        tile.min_time = 1627490285,
+        return tile
+
+    # Mock tiles
+    fake_tiles = [generate_fake_tile(idx) for idx in range(10)]
+    tile_service.find_days_in_range_asc.return_value = [f.min_time[0]+1 for f in fake_tiles]
+
+    # Mock result
+    # 'spark_driver' returns results, meta. 
+    # Format of results: [[{stats}, {stats}, ...]]
+    # Format of meta: {}
+    fake_spark_result = list(itertools.chain.from_iterable(stats_arr))
+    fake_spark_result = sorted(fake_spark_result, key=lambda entry: entry["time"]), {}
+
+    timeseries_obj = TimeSeriesSparkHandlerImpl(tile_service_factory=tile_service_factory, sc=spark_context)
+    timeseries_obj.parse_arguments = lambda _: [item for item in args.values()]
+
+    # Mock the spark driver with our fake spark results
+    # Also mock _create_nc_file_time1d to avoid writing the .nc file 
+    with mock.patch('webservice.algorithms_spark.TimeSeriesSpark.spark_driver') as mock_driver, \
+        mock.patch('webservice.algorithms_spark.NexusCalcSparkHandler.NexusCalcSparkHandler._create_nc_file_time1d') as mock_create_nc:
+        mock_driver.return_value = fake_spark_result
+        timeseries_result = timeseries_obj.calc(request)
+        
+        # Ensure the call to 'spark_driver' contains the expected params
+
+        # Seasonal filter uses clim, so call_args_list is of length 2
+        assert len(mock_driver.call_args_list) == 2
+
+        # Check data args
+        mock_driver_args = mock_driver.call_args_list[0].args
+        mock_driver_kwargs = mock_driver.call_args_list[0].kwargs    
+        assert mock_driver_args[0] == [f.min_time[0]+1 for f in fake_tiles]
+        assert mock_driver_args[1] == wkt.loads(polygon_wkt)
+        assert mock_driver_args[2] == args['ds'][0]
+        assert mock_driver_args[3] == tile_service_factory
+        assert mock_driver_args[5] == args['normalize_dates']
+        assert mock_driver_kwargs['sc'] == spark_context
+
+        # And clim args
+        mock_driver_args = mock_driver.call_args_list[1].args
+        mock_driver_kwargs = mock_driver.call_args_list[1].kwargs    
+        assert mock_driver_args[0] == [f.min_time[0]+1 for f in fake_tiles]
+        assert mock_driver_args[1] == wkt.loads(polygon_wkt)
+        assert mock_driver_args[2] == args['ds'][0] + '_clim'
+        assert mock_driver_args[3] == tile_service_factory
+        assert mock_driver_kwargs['sc'] == spark_context
+
+        # Ensure the result of the timeseries is as expected
+        json_timeseries_result = json.loads(timeseries_result.toJson())
+
+        # Meta
+        assert len(json_timeseries_result['meta']) == 1
+        assert json_timeseries_result['meta'][0]['shortName'] == args['ds'][0]
+        assert json_timeseries_result['meta'][0]['bounds']['north'] == 31
+        assert json_timeseries_result['meta'][0]['bounds']['east'] == -30.1
+        assert json_timeseries_result['meta'][0]['bounds']['south'] == 29.54
+        assert json_timeseries_result['meta'][0]['bounds']['west'] == -34.98
+        assert json_timeseries_result['meta'][0]['time']['start'] == stats_arr[0][0]['time']
+        assert json_timeseries_result['meta'][0]['time']['stop'] == stats_arr[0][-1]['time']
+
+        # Data
+        for result_data, stats_data in zip(json_timeseries_result['data'][0], stats_arr[0]):
+            month = datetime.utcfromtimestamp(result_data['time']).month
+
+            assert result_data['min'] == stats_data['min']
+            assert result_data['max'] == stats_data['max']
+            assert result_data['mean'] == stats_data['mean']
+            assert result_data['cnt'] == stats_data['cnt']
+            assert result_data['std'] == stats_data['std']
+            assert result_data['time'] == stats_data['time']
+            assert result_data['iso_time'] == stats_data['iso_time']
+            # Clim uses the first of the month values
+            assert result_data['minSeasonal'] == result_data['min'] - clim_stats[month]['min']
+            assert result_data['maxSeasonal'] == result_data['max'] - clim_stats[month]['max']
+            assert result_data['meanSeasonal'] == result_data['mean'] - clim_stats[month]['mean']
+
+        # Stats
+        assert json_timeseries_result['stats'] == {}
+
+def test_calc_seasonal_filter_daily(timeseries_args):
+    """
+    Assert that the expected functions are called during the timeseries
+    calculation and that the results are formatted as expected.
+    """
+    # Mock anything that connects external dependence (Solr, Cassandra, ...)
+    tile_service_factory = mock.MagicMock()
+    tile_service = mock.MagicMock()
+    tile_service_factory.return_value = tile_service
+    spark = SparkSession.builder.appName('nexus-analysis').getOrCreate()
+    spark_context = spark.sparkContext
+    request = mock.MagicMock()
+    request.get_argument.return_value = '1,2,3,4,5,6,7,8'
+
+    # Generate the dummy data
+    stats_arr, clim_stats = generate_daily_stats()
+
+    # Dummy bounding polygon
+    polygon_wkt = 'POLYGON((-34.98 29.54, -30.1 29.54, -30.1 31.00, -34.98 31.00, -34.98 29.54))'
+
+    # Patch in request arguments
+    args = {
+        'ds': ['test-dataset'],
+        'bounding_polygon': wkt.loads(polygon_wkt),
+        'start_seconds_from_epoch': stats_arr[0][0]['time'],
+        'end_seconds_from_epoch': stats_arr[0][-1]['time'],
+        'apply_seasonal_cycle_filter': True,
+        'apply_low_pass_filter': False,
+        'nparts_requested': 1,
+        'normalize_dates': False,
+    }
+
+    def generate_fake_tile(tile_id):
+        tile = Tile()
+        tile.tile_id = tile_id
+        tile.min_time = 1627490285,
+        return tile
+
+    # Mock tiles
+    fake_tiles = [generate_fake_tile(idx) for idx in range(10)]
+    tile_service.find_days_in_range_asc.return_value = [f.min_time[0]+1 for f in fake_tiles]
+
+    
+
+    # Mock result
+    # 'spark_driver' returns results, meta. 
+    # Format of results: [[{stats}, {stats}]]
+    # Format of meta: {}
+    fake_spark_result = list(itertools.chain.from_iterable(stats_arr))
+    fake_spark_result = sorted(fake_spark_result, key=lambda entry: entry["time"]), {}
+
+    timeseries_obj = TimeSeriesSparkHandlerImpl(tile_service_factory=tile_service_factory, sc=spark_context)
+    timeseries_obj.parse_arguments = lambda _: [item for item in args.values()]
+
+    # Mock the spark driver with our fake spark results
+    # Also mock _create_nc_file_time1d to avoid writing the .nc file 
+    with mock.patch('webservice.algorithms_spark.TimeSeriesSpark.spark_driver') as mock_driver, \
+        mock.patch('webservice.algorithms_spark.NexusCalcSparkHandler.NexusCalcSparkHandler._create_nc_file_time1d') as mock_create_nc:
+        mock_driver.return_value = fake_spark_result
+        timeseries_result = timeseries_obj.calc(request)
+
+        # Ensure the call to 'spark_driver' contains the expected params
+
+        # Seasonal filter uses clim, so call_args_list is of length 2
+        assert len(mock_driver.call_args_list) == 2
+
+        # Check data args
+        mock_driver_args = mock_driver.call_args_list[0].args
+        mock_driver_kwargs = mock_driver.call_args_list[0].kwargs    
+        assert mock_driver_args[0] == [f.min_time[0]+1 for f in fake_tiles]
+        assert mock_driver_args[1] == wkt.loads(polygon_wkt)
+        assert mock_driver_args[2] == args['ds'][0]
+        assert mock_driver_args[3] == tile_service_factory
+        assert mock_driver_args[5] == args['normalize_dates']
+        assert mock_driver_kwargs['sc'] == spark_context
+
+        # And clim args
+        mock_driver_args = mock_driver.call_args_list[1].args
+        mock_driver_kwargs = mock_driver.call_args_list[1].kwargs    
+        assert mock_driver_args[0] == [f.min_time[0]+1 for f in fake_tiles]
+        assert mock_driver_args[1] == wkt.loads(polygon_wkt)
+        assert mock_driver_args[2] == args['ds'][0] + '_clim'
+        assert mock_driver_args[3] == tile_service_factory
+        assert mock_driver_kwargs['sc'] == spark_context
+
+        # Ensure the result of the timeseries is as expected
+        json_timeseries_result = json.loads(timeseries_result.toJson())
+
+        # Meta
+        assert len(json_timeseries_result['meta']) == 1
+        assert json_timeseries_result['meta'][0]['shortName'] == args['ds'][0]
+        assert json_timeseries_result['meta'][0]['bounds']['north'] == 31
+        assert json_timeseries_result['meta'][0]['bounds']['east'] == -30.1
+        assert json_timeseries_result['meta'][0]['bounds']['south'] == 29.54
+        assert json_timeseries_result['meta'][0]['bounds']['west'] == -34.98
+        assert json_timeseries_result['meta'][0]['time']['start'] == stats_arr[0][0]['time']
+        assert json_timeseries_result['meta'][0]['time']['stop'] == stats_arr[0][-1]['time']
+
+        # Data
+        for result_data, stats_data in zip(json_timeseries_result['data'][0], stats_arr[0]):
+            month = datetime.utcfromtimestamp(result_data['time']).month
+
+            assert result_data['min'] == stats_data['min']
+            assert result_data['max'] == stats_data['max']
+            assert result_data['mean'] == stats_data['mean']
+            assert result_data['cnt'] == stats_data['cnt']
+            assert result_data['std'] == stats_data['std']
+            assert result_data['time'] == stats_data['time']
+            assert result_data['iso_time'] == stats_data['iso_time']
+            assert result_data['minSeasonal'] == result_data['min'] - clim_stats[month]['min']
+            assert result_data['maxSeasonal'] == result_data['max'] - clim_stats[month]['max']
+            assert result_data['meanSeasonal'] == result_data['mean'] - clim_stats[month]['mean']
+
+        # Stats
+        assert json_timeseries_result['stats'] == {}
+
+def test_calc_seasonal_lowpass_filter(timeseries_args):
+    """
+    Lowpass assertions are commented out for now - will resume work as needed.
+    """
+    # Mock anything that connects external dependence (Solr, Cassandra, ...)
+    tile_service_factory = mock.MagicMock()
+    tile_service = mock.MagicMock()
+    tile_service_factory.return_value = tile_service
+    spark = SparkSession.builder.appName('nexus-analysis').getOrCreate()
+    spark_context = spark.sparkContext
+    request = mock.MagicMock()
+    request.get_argument.return_value = '1,2,3,4,5,6,7,8'
+
+    # Generate the dummy data
+    stats_arr, clim_stats = generate_monthly_stats()
+
+    # Dummy bounding polygon
+    polygon_wkt = 'POLYGON((-34.98 29.54, -30.1 29.54, -30.1 31.00, -34.98 31.00, -34.98 29.54))'
+    
+    # Patch in request arguments
+    args = {
+        'ds': ['test-dataset'],
+        'bounding_polygon': wkt.loads(polygon_wkt),
+        'start_seconds_from_epoch': stats_arr[0][0]['time'],
+        'end_seconds_from_epoch': stats_arr[0][-1]['time'],
+        'apply_seasonal_cycle_filter': True,
+        'apply_low_pass_filter': True,
+        'nparts_requested': 1,
+        'normalize_dates': False,
+    }    
+
+    def generate_fake_tile(tile_id):
+        tile = Tile()
+        tile.tile_id = tile_id
+        tile.min_time = 1627490285,
+        return tile
+
+    # Mock tiles
+    fake_tiles = [generate_fake_tile(idx) for idx in range(10)]
+    tile_service.find_days_in_range_asc.return_value = [f.min_time[0]+1 for f in fake_tiles]
+
+    # Mock result
+    # 'spark_driver' returns results, meta. 
+    # Format of results: [[{stats}, {stats}, ...]]
+    # Format of meta: {}
+    fake_spark_result = list(itertools.chain.from_iterable(stats_arr))
+    fake_spark_result = sorted(fake_spark_result, key=lambda entry: entry["time"]), {}
+
+    timeseries_obj = TimeSeriesSparkHandlerImpl(tile_service_factory=tile_service_factory, sc=spark_context)
+    timeseries_obj.parse_arguments = lambda _: [item for item in args.values()]
+
+    # Mock the spark driver with our fake spark results
+    # Also mock _create_nc_file_time1d to avoid writing the .nc file 
+    with mock.patch('webservice.algorithms_spark.TimeSeriesSpark.spark_driver') as mock_driver, \
+        mock.patch('webservice.algorithms_spark.NexusCalcSparkHandler.NexusCalcSparkHandler._create_nc_file_time1d') as mock_create_nc:
+        mock_driver.return_value = fake_spark_result
+        timeseries_result = timeseries_obj.calc(request)
+        
+        # Ensure the call to 'spark_driver' contains the expected params
+
+        # Seasonal filter uses clim, so call_args_list is of length 2
+        assert len(mock_driver.call_args_list) == 2
+
+        # Check data args
+        mock_driver_args = mock_driver.call_args_list[0].args
+        mock_driver_kwargs = mock_driver.call_args_list[0].kwargs    
+        assert mock_driver_args[0] == [f.min_time[0]+1 for f in fake_tiles]
+        assert mock_driver_args[1] == wkt.loads(polygon_wkt)
+        assert mock_driver_args[2] == args['ds'][0]
+        assert mock_driver_args[3] == tile_service_factory
+        assert mock_driver_args[5] == args['normalize_dates']
+        assert mock_driver_kwargs['sc'] == spark_context
+
+        # And clim args
+        mock_driver_args = mock_driver.call_args_list[1].args
+        mock_driver_kwargs = mock_driver.call_args_list[1].kwargs    
+        assert mock_driver_args[0] == [f.min_time[0]+1 for f in fake_tiles]
+        assert mock_driver_args[1] == wkt.loads(polygon_wkt)
+        assert mock_driver_args[2] == args['ds'][0] + '_clim'
+        assert mock_driver_args[3] == tile_service_factory
+        assert mock_driver_kwargs['sc'] == spark_context
+
+        # Ensure the result of the timeseries is as expected
+        json_timeseries_result = json.loads(timeseries_result.toJson())
+
+        # Meta
+        assert len(json_timeseries_result['meta']) == 1
+        assert json_timeseries_result['meta'][0]['shortName'] == args['ds'][0]
+        assert json_timeseries_result['meta'][0]['bounds']['north'] == 31
+        assert json_timeseries_result['meta'][0]['bounds']['east'] == -30.1
+        assert json_timeseries_result['meta'][0]['bounds']['south'] == 29.54
+        assert json_timeseries_result['meta'][0]['bounds']['west'] == -34.98
+        assert json_timeseries_result['meta'][0]['time']['start'] == stats_arr[0][0]['time']
+        assert json_timeseries_result['meta'][0]['time']['stop'] == stats_arr[0][-1]['time']
+
+        # Data
+        for result_data, stats_data in zip(json_timeseries_result['data'][0], stats_arr[0]):
+            month = datetime.utcfromtimestamp(result_data['time']).month
+
+            assert result_data['min'] == stats_data['min']
+            assert result_data['max'] == stats_data['max']
+            assert result_data['mean'] == stats_data['mean']
+            assert result_data['cnt'] == stats_data['cnt']
+            assert result_data['std'] == stats_data['std']
+            assert result_data['time'] == stats_data['time']
+            assert result_data['iso_time'] == stats_data['iso_time']
+            # Clim uses the first of the month values, which is the same as the data
+            assert result_data['minSeasonal'] == result_data['min'] - clim_stats[month]['min']
+            assert result_data['maxSeasonal'] == result_data['max'] - clim_stats[month]['max']
+            assert result_data['meanSeasonal'] == result_data['mean'] - clim_stats[month]['mean']
+
+            # Low pass data
+            # TODO: need to determine expected low pass results
+            # assert result_data['minLowPass']  result_data['min']
+            # assert result_data['maxLowPass']  result_data['max']
+            # assert result_data['meanLowPass']  result_data['mean']
+            # assert result_data['minSeasonalLowPass']  result_data['minSeasonal']
+            # assert result_data['maxSeasonalLowPass']  result_data['maxSeasonal']
+            # assert result_data['meanSeasonalLowPass']  result_data['meanSeasonal']
+
+        # Stats
+        assert json_timeseries_result['stats'] == {}