You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by le...@apache.org on 2017/10/27 22:40:16 UTC

[39/51] [partial] incubator-sdap-nexus git commit: SDAP-1 Import all code under the SDAP SGA

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/climatology/clim/climatology1.py
----------------------------------------------------------------------
diff --git a/climatology/clim/climatology1.py b/climatology/clim/climatology1.py
new file mode 100755
index 0000000..940a8d0
--- /dev/null
+++ b/climatology/clim/climatology1.py
@@ -0,0 +1,233 @@
+"""
+climatology.py
+
+Compute a multi-epoch (multi-day) climatology from daily SST Level-3 grids.
+
+Simple code to be run on Spark cluster, or using multi-core parallelism on single machine.
+
+"""
+
+import sys, os, calendar, urlparse, urllib
+from datetime import datetime
+import numpy as N
+from variables import getVariables, close
+#from timePartitions import partitionFilesByKey
+from split import fixedSplit
+#from stats import Stats
+from pathos.multiprocessing import ProcessingPool as Pool
+from plotlib import imageMap, makeMovie
+
+#from gaussInterp import gaussInterp
+
+VERBOSE = 1
+
+# Possible execution modes
+# Multicore & cluster modes use pathos pool.map(); Spark mode uses PySpark cluster.
+ExecutionModes = ['sequential', 'multicore', 'cluster', 'spark']
+
+# SST L3m 4.6km Metadata
+# SST calues are scaled integers in degrees Celsius, lat/lon is 4320 x 8640
+# Variable = 'sst', Mask = 'qual_sst', Coordinates = ['lat', 'lon']
+
+# Generate algorithmic name for N-day Climatology product
+SSTClimatologyTemplate = 'SST.L3.Global.Clim.%(period)s.%(date)s.%(version)s.nc'  #??
+
+# Simple mask and average functions to get us started, then add gaussian interpolation.
+# MODIS L3 SST product, qual_sst is [-1, 2] - =0 is best data, can add =1 for better coverage
+def qcMask(var, mask): return N.ma.array(var, mask=N.ma.make_mask(mask))
+#def qcMask(var, mask): return N.ma.masked_where(mask != 0, var)
+
+def average(a): return N.ma.mean(a, axis=0)
+#AveragingFunctions = {'pixelAverage': average, 'gaussInterp': gaussInterp}
+AveragingFunctions = {'pixelAverage': average, 'gaussInterp': average}
+
+
+def climByAveragingPeriods(urls,              # list of (daily) granule URLs for a long time period (e.g. a year)
+                    nEpochs,                  # compute a climatology for every N epochs (days) by 'averaging'
+                    nWindow,                  # number of epochs in window needed for averaging
+                    variable,                 # name of primary variable in file
+                    mask,                     # name of mask variable
+                    coordinates,              # names of coordinate arrays to read and pass on (e.g. 'lat' and 'lon')
+                    maskFn=qcMask,            # mask function to compute mask from mask variable
+                    averager='pixelAverage',  # averaging function to use, one of ['pixelAverage', 'gaussInterp']
+                    mode='sequential',        # Map across time periods of N-days for concurrent work, executed by:
+                                              # 'sequential' map, 'multicore' using pool.map(), 'cluster' using pathos pool.map(),
+                                              # or 'spark' using PySpark
+                    numNodes=1,               # number of cluster nodes to use
+                    nWorkers=4,               # number of parallel workers per node
+                    averagingFunctions=AveragingFunctions,    # dict of possible averaging functions
+                    legalModes=ExecutionModes  # list of possiblel execution modes
+                   ):
+    '''Compute a climatology every N days by applying a mask and averaging function.
+Writes the averaged variable grid, attributes of the primary variable, and the coordinate arrays in a dictionary.
+***Assumption:  This routine assumes that the N grids will fit in memory.***
+    '''
+    try:
+        averageFn = averagingFunctions[averager]
+    except :
+        averageFn = average
+        print >>sys.stderr, 'climatology: Error, Averaging function must be one of: %s' % str(averagingFunctions)
+
+    urlSplits = [s for s in fixedSplit(urls, nEpochs)]
+    if VERBOSE: print >>sys.stderr, urlSplits
+
+    def climsContoured(urls):
+        n = len(urls)
+        var = climByAveraging(urls, variable, mask, coordinates, maskFn, averageFn)
+        return contourMap(var, variable, coordinates, n, urls[0])
+
+    if mode == 'sequential':
+        plots = map(climsContoured, urlSplits)
+    elif mode == 'multicore':
+        pool = Pool(nWorkers)
+        plots = pool.map(climsContoured, urlSplits)        
+    elif mode == 'cluster':
+        pass
+    elif mode == 'spark':
+        pass
+
+    plots = map(climsContoured, urlSplits)
+    print plots
+    return plots
+#    return makeMovie(plots, 'clim.mpg')    
+
+
+def climByAveraging(urls,                 # list of granule URLs for a time period
+                    variable,             # name of primary variable in file
+                    mask,                 # name of mask variable
+                    coordinates,          # names of coordinate arrays to read and pass on (e.g. 'lat' and 'lon')
+                    maskFn=qcMask,        # mask function to compute mask from mask variable
+                    averageFn=average     # averaging function to use
+                   ):
+    '''Compute a climatology over N arrays by applying a mask and averaging function.
+Returns the averaged variable grid, attributes of the primary variable, and the coordinate arrays in a dictionary.
+***Assumption:  This routine assumes that the N grids will fit in memory.***
+    '''
+    n = len(urls)
+    varList = [variable, mask]
+    for i, url in enumerate(urls):
+        fn = retrieveFile(url, '~/cache')
+        if VERBOSE: print >>sys.stderr, 'Read variables and mask ...'
+        var, fh = getVariables(fn, varList)            # return dict of variable objects by name
+        if i == 0:
+            dtype = var[variable].dtype
+            shape = (n,) + var[variable].shape
+            accum = N.ma.empty(shape, dtype)
+        
+        v = maskFn(var[variable], var[mask])           # apply quality mask variable to get numpy MA
+#        v = var[variable][:]
+        accum[i] = v                                   # accumulate N arrays for 'averaging'
+        if i+1 != len(urls):                           # keep var dictionary from last file to grab metadata
+            close(fh)                                  # REMEMBER:  closing fh loses in-memory data structures
+
+    if VERBOSE: print >>sys.stderr, 'Averaging ...'
+    coord, fh = getVariables(fn, coordinates)          # read coordinate arrays and add to dict
+    for c in coordinates: var[c] = coord[c][:]
+
+    if averageFn == average:
+        avg = averageFn(accum)                         # call averaging function
+    else:
+        var[variable] = accum
+        if averageFn == gaussInterp:
+            varNames = variable + coordinates
+            avg, vweight, status = \
+                gaussInterp(var, varNames, latGrid, lonGrid, wlat, wlon, slat, slon, stime, vfactor, missingValue)
+        
+    var['attributes'] = var[variable].__dict__         # save attributes of primary variable
+    var[variable] = avg                                # return primary variable & mask arrays in dict
+    var[mask] = N.ma.getmask(avg)
+
+#    close(fh)                                         # Can't close, lose netCDF4.Variable objects, leaking two fh
+    return var
+
+
+def contourMap(var, variable, coordinates, n, url):
+    p = urlparse.urlparse(url)
+    filename = os.path.split(p.path)[1]
+    return filename
+    outFile = filename + '.png'
+    # Downscale variable array (SST) before contouring, matplotlib is TOO slow on large arrays
+    vals = var[variable][:]
+
+    # Fixed color scale, write file, turn off auto borders, set title, reverse lat direction so monotonically increasing??
+    imageMap(var[coordinates[1]][:], var[coordinates[0]][:], var[variable][:],
+             vmin=-2., vmax=45., outFile=outFile, autoBorders=False,
+             title='%s %d-day Mean from %s' % (variable.upper(), n, filename))
+    print >>sys.stderr, 'Writing contour plot to %s' % outFile
+    return outFile
+
+
+def isLocalFile(url):
+    '''Check if URL is a local path.'''
+    u = urlparse.urlparse(url)
+    if u.scheme == '' or u.scheme == 'file':
+        if not path.exists(u.path):
+            print >>sys.stderr, 'isLocalFile: File at local path does not exist: %s' % u.path
+        return (True, u.path)
+    else:
+        return (False, u.path)
+
+
+def retrieveFile(url, dir=None):
+    '''Retrieve a file from a URL, or if it is a local path then verify it exists.'''
+    if dir is None: dir = './'
+    ok, path = isLocalFile(url)
+    fn = os.path.split(path)[1]
+    outPath = os.path.join(dir, fn)
+    if not ok:
+        if os.path.exists(outPath):
+            print >>sys.stderr, 'retrieveFile: Using cached file: %s' % outPath
+        else:
+            try:
+                print >>sys.stderr, 'retrieveFile: Retrieving (URL) %s to %s' % (url, outPath)
+                urllib.urlretrieve(url, outPath)
+            except:
+                print >>sys.stderr, 'retrieveFile: Cannot retrieve file at URL: %s' % url
+                return None
+    return outPath    
+
+
+def dailyFile2date(path, offset=1):
+    '''Convert YYYYDOY string in filename to date.'''
+    fn = os.path.split(path)[1]
+    year = int(fn[offset:offset+4])
+    doy = int(fn[offset+5:offset+8])
+    return fn[5:15].replace('.', '/')
+
+
+def formatRegion(r):
+    """Format lat/lon region specifier as string suitable for file name."""
+    if isinstance(r, str):
+        return r
+    else:
+        strs = [str(i).replace('-', 'm') for i in r]
+        return 'region-%s-%sby%s-%s' % tuple(strs)
+
+
+def formatGrid(r):
+    """Format lat/lon grid resolution specifier as string suitable for file name."""
+    if isinstance(r, str):
+        return r
+    else:
+        return str(r[0]) + 'by' + str(r[1])
+
+
+def main(args):
+    nEpochs = int(args[0])
+    nWindow = int(args[1])
+    averager = args[2]
+    mode = args[3]
+    nWorkers = int(args[4])
+    urlFile = args[5]
+    urls = [s.strip() for s in open(urlFile, 'r')]
+    return climByAveragingPeriods(urls, nEpochs, nWindow, 'sst', 'qual_sst', ['lat', 'lon'],
+                                  averager=averager, mode=mode, nWorkers=nWorkers)
+
+    
+if __name__ == '__main__':
+    print main(sys.argv[1:])
+
+
+# python climatology.py 5 5 pixelAverage sequential 1 urls_sst_10days.txt
+# python climatology.py 5 5 gaussianInterp multicore 8 urls_sst_40days.txt
+

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/climatology/clim/climatology2.py
----------------------------------------------------------------------
diff --git a/climatology/clim/climatology2.py b/climatology/clim/climatology2.py
new file mode 100755
index 0000000..231efb7
--- /dev/null
+++ b/climatology/clim/climatology2.py
@@ -0,0 +1,453 @@
+"""
+climatology2.py
+
+Compute a multi-epoch (multi-day) climatology from daily SST Level-3 grids.
+
+Simple code to be run on Spark cluster, or using multi-core parallelism on single machine.
+
+"""
+
+import sys, os, urlparse, urllib, re, time
+import numpy as N
+import matplotlib
+matplotlib.use('Agg')
+import matplotlib.pylab as M
+
+from variables import getVariables, close
+from cache import retrieveFile, CachePath
+from split import fixedSplit, splitByNDays
+from netCDF4 import Dataset, default_fillvals
+from pathos.multiprocessing import ProcessingPool as Pool
+from plotlib import imageMap, makeMovie
+
+from spatialFilter import spatialFilter
+from gaussInterp import gaussInterp         # calls into Fortran version gaussInterp_f.so
+#from gaussInterp_slow import gaussInterp_slow as gaussInterp       # pure python, slow debuggable version
+
+#import pyximport; pyximport.install(pyimport=False)
+#from gaussInterp import gaussInterp, gaussInterp_      # attempted cython version, had issues
+
+VERBOSE = 1
+
+# Possible execution modes
+# Multicore & cluster modes use pathos pool.map(); Spark mode uses PySpark cluster.
+ExecutionModes = ['sequential', 'multicore', 'cluster', 'spark']
+
+# SST L3m 4.6km Metadata
+# SST calues are scaled integers in degrees Celsius, lon/lat is 8640 x 4320
+# Variable = 'sst', Mask = 'qual_sst', Coordinates = ['lon', 'lat']
+
+# Generate algorithmic name for N-day Climatology product
+SSTClimatologyTemplate = 'SST.L3.Global.Clim.%(period)s.%(date)s.%(version)s.nc'  #??
+
+# Simple mask and average functions to get us started, then add gaussian interpolation.
+# MODIS L3 SST product, qual_sst is [-1, 2] - =0 is best data, can add =1 for better coverage
+#def qcMask(var, mask): return N.ma.array(var, mask=N.ma.make_mask(mask == 0))
+
+def qcMask(var, mask): return N.ma.masked_where(mask != 0, var)
+#def qcMask(var, mask): return N.ma.masked_where(mask < 0, var)
+
+def splitModisSst(seq, n):
+    for chunk in splitByNDays(seq, n, re.compile(r'(...).L3m'), keyed=False):
+        yield chunk
+
+def mean(a): return N.ma.mean(a, axis=0)
+
+AveragingFunctions = {'pixelMean': mean, 'gaussInterp': gaussInterp, 'spatialFilter': spatialFilter}
+
+PixelMeanConfig = {'name': 'pixelMean'}
+
+GaussInterpConfig = {'name': 'gaussInterp',
+                     'latGrid': None, 'lonGrid': None,         # None means use input lat/lon grid
+                     'wlat': 3, 'wlon': 3,
+                     'slat': 0.15, 'slon': 0.15, 'stime': 1,
+                     'vfactor': -0.6931, 'missingValue': default_fillvals['f4']}
+
+GaussInterpConfig1a = {'name': 'gaussInterp',
+                     'latGrid': None, 'lonGrid': None,         # None means use input lat/lon grid
+                     'wlat': 0.30, 'wlon': 0.30,
+                     'slat': 0.15, 'slon': 0.15, 'stime': 1,
+                     'vfactor': -0.6931, 'missingValue': default_fillvals['f4']}
+
+GaussInterpConfig1b = {'name': 'gaussInterp',
+                     'latGrid': None, 'lonGrid': None,         # None means use input lat/lon grid
+                     'wlat': 0.08, 'wlon': 0.08,
+                     'slat': 0.15, 'slon': 0.15, 'stime': 1,
+                     'vfactor': -0.6931, 'missingValue': default_fillvals['f4']}
+
+GaussInterpConfig2 = {'name': 'gaussInterp',
+                     'latGrid': (89.5, -89.5, -0.25), 'lonGrid': (-180., 179., 0.25),
+                     'wlat': 2., 'wlon': 2.,
+                     'slat': 0.15, 'slon': 0.15, 'stime': 1,
+                     'vfactor': -0.6931, 'missingValue': default_fillvals['f4']}
+
+FilterGaussian = [[1, 2, 1], [2, 4, 2], [1, 2, 1]]    # divide by 16
+FilterLowPass = [[1, 1, 1], [1, 1, 1], [1, 1, 1]]     # divide by 9
+
+SpatialFilterConfig1 = {'name': 'spatialFilter', 'normalization': 16., 
+                        'spatialFilter': N.array(FilterGaussian, dtype=N.int32),
+                        'missingValue': default_fillvals['f4']}
+
+SpatialFilterConfig2 = {'name': 'spatialFilter', 'normalization': 9.,
+                        'spatialFilter': N.array(FilterLowPass, dtype=N.int32),
+                        'missingValue': default_fillvals['f4']}
+
+
+
+def climByAveragingPeriods(urls,              # list of (daily) granule URLs for a long time period (e.g. a year)
+                    nEpochs,                  # compute a climatology for every N epochs (days) by 'averaging'
+                    nWindow,                  # number of epochs in window needed for averaging
+                    nNeighbors,               # number of neighbors on EACH side in lat/lon directions to use in averaging
+                    variable,                 # name of primary variable in file
+                    mask,                     # name of mask variable
+                    coordinates,              # names of coordinate arrays to read and pass on (e.g. 'lat' and 'lon')
+                    splitFn=splitModisSst,    # split function to use to partition the input URL list
+                    maskFn=qcMask,            # mask function to compute mask from mask variable
+                    averager='pixelMean',     # averaging function to use, one of ['pixelMean', 'gaussInterp', 'spatialFilter']
+                    averagingConfig={},       # dict of parameters to control the averaging function (e.g. gaussInterp)
+                    optimization='fortran',   # optimization mode (fortran or cython)
+                    mode='sequential',        # Map across time periods of N-days for concurrent work, executed by:
+                                              # 'sequential' map, 'multicore' using pool.map(), 'cluster' using pathos pool.map(),
+                                              # or 'spark' using PySpark
+                    numNodes=1,               # number of cluster nodes to use
+                    nWorkers=4,               # number of parallel workers per node
+                    averagingFunctions=AveragingFunctions,    # dict of possible averaging functions
+                    legalModes=ExecutionModes,  # list of possible execution modes
+                    cachePath=CachePath       # directory to cache retrieved files in
+                   ):
+    '''Compute a climatology every N days by applying a mask and averaging function.
+Writes the averaged variable grid, attributes of the primary variable, and the coordinate arrays in a dictionary.
+***Assumption:  This routine assumes that the N grids will fit in memory.***
+    '''
+    if averagingConfig['name'] == 'gaussInterp':
+        averagingConfig['wlat'] = nNeighbors
+        averagingConfig['wlon'] = nNeighbors
+    try:
+        averageFn = averagingFunctions[averager]
+    except:
+        print >>sys.stderr, 'climatology: Error, Averaging function must be one of: %s' % str(averagingFunctions)
+        sys.exit(1)
+
+    urlSplits = [s for s in splitFn(urls, nEpochs)]
+
+    def climsContoured(urls, plot=None, fillValue=default_fillvals['f4'], format='NETCDF4', cachePath=cachePath):
+        n = len(urls)
+        if VERBOSE: print >>sys.stderr, urls
+
+        var = climByAveraging(urls, variable, mask, coordinates, maskFn, averageFn, averagingConfig, optimization, cachePath)
+
+        fn = os.path.split(urls[0])[1]
+        inFile = os.path.join(cachePath, fn)
+        method = averagingConfig['name']
+        fn = os.path.splitext(fn)[0]
+        day = fn[5:8]
+        nDays = int(var['time'][0])
+
+        if 'wlat' in averagingConfig:
+            wlat = averagingConfig['wlat']
+        else:
+            wlat = 1
+        if int(wlat) == wlat:
+            outFile = 'A%s.L3m_%dday_clim_sst_4km_%s_%dnbrs.nc' % (day, nDays, method, int(wlat))    # mark each file with first day in period
+        else:
+            outFile = 'A%s.L3m_%dday_clim_sst_4km_%s_%4.2fnbrs.nc' % (day, nDays, method, wlat)    # mark each file with first day in period
+
+        outFile = writeOutNetcdfVars(var, variable, mask, coordinates, inFile, outFile, fillValue, format)
+
+        if plot == 'contour':
+            figFile = contourMap(var, variable, coordinates, n, outFile)
+        elif plot == 'histogram':
+#            figFile = histogram(var, variable, n, outFile)
+            figFile = None
+        else:
+            figFile = None
+        return (outFile, figFile)
+
+    if mode == 'sequential':
+        results = map(climsContoured, urlSplits)
+    elif mode == 'multicore':
+        pool = Pool(nWorkers)
+        results = pool.map(climsContoured, urlSplits)        
+    elif mode == 'cluster':
+        pass
+    elif mode == 'spark':
+        pass
+
+    return results
+
+
+def climByAveraging(urls,                    # list of granule URLs for a time period
+                    variable,                # name of primary variable in file
+                    mask,                    # name of mask variable
+                    coordinates,             # names of coordinate arrays to read and pass on (e.g. 'lat' and 'lon')
+                    maskFn=qcMask,           # mask function to compute mask from mask variable
+                    averageFn=mean,          # averaging function to use
+                    averagingConfig={},      # parameters to control averaging function (e.g. gaussInterp)
+                    optimization='fortran',  # optimization mode (fortran or cython)
+                    cachePath=CachePath
+                   ):
+    '''Compute a climatology over N arrays by applying a mask and averaging function.
+Returns the averaged variable grid, attributes of the primary variable, and the coordinate arrays in a dictionary.
+***Assumption:  This routine assumes that the N grids will fit in memory.***
+    '''
+    n = len(urls)
+    varList = [variable, mask]
+    var = {}
+    vtime = N.zeros((n,), N.int32)
+
+    for i, url in enumerate(urls):
+        try:
+            path = retrieveFile(url, cachePath)
+            fn = os.path.split(path)[1]
+            vtime[i] = int(fn[5:8])     # KLUDGE: extract DOY from filename
+        except:
+            print >>sys.stderr, 'climByAveraging: Error, continuing without file %s' % url
+            accum[i] = emptyVar
+            continue
+        if path is None: continue
+        print >>sys.stderr, 'Read variables and mask ...'
+        try:
+            var, fh = getVariables(path, varList, arrayOnly=True, order='F', set_auto_mask=False)   # return dict of variable objects by name
+        except:
+            print >>sys.stderr, 'climByAveraging: Error, cannot read file %s' % path
+            accum[i] = emptyVar
+            continue
+        if i == 0:
+            dtype = var[variable].dtype
+            if 'int' in dtype.name: dtype = N.float32
+            shape = (n,) + var[variable].shape
+            accum = N.ma.empty(shape, dtype, order='F')
+            emptyVar = N.array(N.ma.masked_all(var[variable].shape, dtype), order='F')  # totally masked variable array for missing or bad file reads
+
+            print >>sys.stderr, 'Read coordinates ...'
+            var, fh = getVariables(path, coordinates, var, arrayOnly=True, order='F')   # read coordinate arrays and add to dict
+
+        var[variable] = maskFn(var[variable], var[mask])     # apply quality mask variable to get numpy MA, turned off masking done by netCDF4 library
+#       var[variable] = var[variable][:]
+
+        # Echo variable range for sanity check
+        vals = var[variable].compressed()
+        print >>sys.stderr, 'Variable Range: min, max:', vals.min(), vals.max()
+
+        # Plot input grid
+#        figFile = histogram(vals, variable, n, os.path.split(path)[1])
+#        figFile = contourMap(var, variable, coordinates[1:], n, os.path.split(path)[1])
+
+        accum[i] = var[variable]                        # accumulate N arrays for 'averaging"""
+#        if i != 0 and i+1 != n: close(fh)              # REMEMBER: closing fh loses netCDF4 in-memory data structures
+        close(fh)
+
+    coordinates = ['time'] + coordinates               # add constructed time (days) as first coordinate
+    var['time'] = vtime
+
+    if averagingConfig['name'] == 'pixelMean':
+        print >>sys.stderr, 'Doing Pixel Average over %d grids ...' % n
+        start = time.time()
+        avg = averageFn(accum)                         # call pixel averaging function
+        end = time.time()
+        print >>sys.stderr, 'pixelMean execution time:', (end - start)
+        outlat = var[coordinates[1]].astype(N.float32)[:]
+        outlon = var[coordinates[2]].astype(N.float32)[:]
+    elif averagingConfig['name'] == 'gaussInterp':
+        print >>sys.stderr, 'Doing Gaussian Interpolation over %d grids ...' % n
+        var[variable] = accum
+        c = averagingConfig
+        latGrid = c['latGrid']; lonGrid = c['lonGrid']
+        if latGrid is not None and lonGrid is not None:
+            outlat = N.arange(latGrid[0], latGrid[1]+latGrid[2], latGrid[2], dtype=N.float32, order='F')
+            outlon = N.arange(lonGrid[0], lonGrid[1]+lonGrid[2], lonGrid[2], dtype=N.float32, order='F')
+        else:
+            outlat = N.array(var[coordinates[1]], dtype=N.float32, order='F')
+            outlon = N.array(var[coordinates[2]], dtype=N.float32, order='F')
+        varNames = [variable] + coordinates
+        start = time.time()
+        avg, weight, status = \
+            gaussInterp(var, varNames, outlat, outlon, c['wlat'], c['wlon'],
+                        c['slat'], c['slon'], c['stime'], c['vfactor'], c['missingValue'],
+                        VERBOSE, optimization)
+        end = time.time()
+        var['outweight'] = weight.astype(N.float32)
+        print >>sys.stderr, 'gaussInterp execution time:', (end - start)
+    elif averagingConfig['name'] == 'spatialFilter':
+        print >>sys.stderr, 'Applying Spatial 3x3 Filter and then averaging over %d grids ...' % n
+        var[variable] = accum
+        c = averagingConfig
+        varNames = [variable] + coordinates
+        start = time.time()
+        avg, count, status = \
+            spatialFilter(var, varNames, c['spatialFilter'], c['normalization'], 
+                          c['missingValue'], VERBOSE, optimization)
+        end = time.time()
+        print >>sys.stderr, 'spatialFilter execution time:', (end - start)
+        outlat = var[coordinates[1]].astype(N.float32)[:]
+        outlon = var[coordinates[2]].astype(N.float32)[:]
+
+    var['out'+variable] = avg.astype(N.float32)            # return primary variable & mask arrays in dict
+    var['out'+mask] = N.ma.getmask(avg)
+    var['outlat'] = outlat
+    var['outlon'] = outlon
+    return var
+
+
+def writeOutNetcdfVars(var, variable, mask, coordinates, inFile, outFile, fillValue=None, format='NETCDF4'):
+    '''Construct output bundle of arrays with NetCDF dimensions and attributes.
+Output variables and attributes will have same names as the input file.
+    '''
+    din = Dataset(inFile, 'r')
+    dout = Dataset(outFile, 'w', format=format)
+    print >>sys.stderr, 'Writing %s ...' % outFile
+
+    # Transfer global attributes from input file
+    for a in din.ncattrs():
+        dout.setncattr(a, din.getncattr(a))
+
+    # Add dimensions and variables, copying data
+    coordDim = [dout.createDimension(coord, var['out'+coord].shape[0]) for coord in coordinates]    # here output lon, lat, etc.
+    for coord in coordinates:
+        v = dout.createVariable(coord, var['out'+coord].dtype, (coord,))
+        v[:] = var['out'+coord][:]
+    primaryVar = dout.createVariable(variable, var['out'+variable].dtype, coordinates, fill_value=fillValue)
+    primaryVar[:] = var['out'+variable][:]          # transfer array
+    maskVar = dout.createVariable(mask, 'i1', coordinates)
+    maskVar[:] = var['out'+mask].astype('i1')[:]
+
+    # Transfer variable attributes from input file
+    for k,v in dout.variables.iteritems():
+        for a in din.variables[k].ncattrs():
+            if a == 'scale_factor' or a == 'add_offset' or a == '_FillValue': continue
+            v.setncattr(a, din.variables[k].getncattr(a))
+        if k == variable:
+            try:
+#                if fillValue == None: fillValue = din.variables[k].getncattr('_FillValue')        # total kludge
+                if fillValue == None: fillValue = default_fillvals['f4']
+#                print >>sys.stderr, default_fillvals
+#                v.setncattr('_FillValue', fillValue)         # set proper _FillValue for climatology array
+                v.setncattr('missing_value', fillValue)
+                print >>sys.stderr, 'Setting missing_value for primary variable %s to %f' % (variable, fillValue)
+            except:
+                print >>sys.stderr, 'writeOutNetcdfVars: Warning, for variable %s no fill value specified or derivable from inputs.' % variable
+    din.close()
+    dout.close()
+    return outFile
+    
+
+def contourMap(var, variable, coordinates, n, outFile):
+    figFile = os.path.splitext(outFile)[0] + '_hist.png'
+    # TODO: Downscale variable array (SST) before contouring, matplotlib is TOO slow on large arrays
+    vals = var[variable][:]
+
+    # Fixed color scale, write file, turn off auto borders, set title, reverse lat direction so monotonically increasing??
+    imageMap(var[coordinates[1]][:], var[coordinates[0]][:], var[variable][:],
+             vmin=-2., vmax=45., outFile=figFile, autoBorders=False,
+             title='%s %d-day Mean from %s' % (variable.upper(), n, outFile))
+    print >>sys.stderr, 'Writing contour plot to %s' % figFile
+    return figFile
+
+
+def histogram(vals, variable, n, outFile):
+    figFile = os.path.splitext(outFile)[0] + '_hist.png'
+    M.clf()
+#    M.hist(vals, 47, (-2., 45.))
+    M.hist(vals, 94)
+    M.xlim(-5, 45)
+    M.xlabel('SST in degrees Celsius')
+    M.ylim(0, 300000)
+    M.ylabel('Count')
+    M.title('Histogram of %s %d-day Mean from %s' % (variable.upper(), n, outFile))
+    M.show()
+    print >>sys.stderr, 'Writing histogram plot to %s' % figFile
+    M.savefig(figFile)
+    return figFile
+
+
+def dailyFile2date(path, offset=1):
+    '''Convert YYYYDOY string in filename to date.'''
+    fn = os.path.split(path)[1]
+    year = int(fn[offset:offset+4])
+    doy = int(fn[offset+5:offset+8])
+    return fn[5:15].replace('.', '/')
+
+
+def formatRegion(r):
+    """Format lat/lon region specifier as string suitable for file name."""
+    if isinstance(r, str):
+        return r
+    else:
+        strs = [str(i).replace('-', 'm') for i in r]
+        return 'region-%s-%sby%s-%s' % tuple(strs)
+
+
+def formatGrid(r):
+    """Format lat/lon grid resolution specifier as string suitable for file name."""
+    if isinstance(r, str):
+        return r
+    else:
+        return str(r[0]) + 'by' + str(r[1])
+
+
+def main(args):
+    nEpochs = int(args[0])
+    nWindow = int(args[1])
+    nNeighbors = int(args[2])
+    averager = args[3]
+    optimization = args[4]
+    mode = args[5]
+    nWorkers = int(args[6])
+    urlFile = args[7]
+    urls = [s.strip() for s in open(urlFile, 'r')]
+    if averager == 'gaussInterp':
+        results = climByAveragingPeriods(urls, nEpochs, nWindow, nNeighbors, 'sst', 'qual_sst', ['lat', 'lon'],
+                             averager=averager, optimization=optimization, averagingConfig=GaussInterpConfig,
+                             mode=mode, nWorkers=nWorkers)
+    elif averager == 'spatialFilter':
+        results = climByAveragingPeriods(urls, nEpochs, nWindow, nNeighbors, 'sst', 'qual_sst', ['lat', 'lon'],
+                             averager=averager, optimization=optimization, averagingConfig=SpatialFilterConfig1,
+                             mode=mode, nWorkers=nWorkers)
+    elif averager == 'pixelMean':
+        results = climByAveragingPeriods(urls, nEpochs, nWindow, nNeighbors, 'sst', 'qual_sst', ['lat', 'lon'],
+                             averager=averager, optimization=optimization, averagingConfig=PixelMeanConfig,
+                             mode=mode, nWorkers=nWorkers)
+    else:
+        print >>sys.stderr, 'climatology2: Error, averager must be one of', AveragingFunctions.keys()
+        sys.exit(1)
+    
+    if results[0][1] is not None:
+        makeMovie([r[1] for r in results], 'clim.mpg')    
+    return results
+
+    
+if __name__ == '__main__':
+    print main(sys.argv[1:])
+
+
+# Old Tests:
+# python climatology2.py 5 5 0 pixelMean   fortran sequential 1 urls_sst_10days.txt
+# python climatology2.py 5 5 3 gaussInterp fortran sequential 1 urls_sst_10days.txt
+
+# python climatology2.py 5 5 0 pixelMean   fortran sequential 1 urls_sst_40days.txt
+# python climatology2.py 5 5 0 pixelMean   fortran multicore  8 urls_sst_40days.txt
+# python climatology2.py 5 5 3 gaussInterp fortran multicore  8 urls_sst_40days.txt
+
+# Old Production:
+# python climatology2.py 5 5 0 pixelMean     fortran multicore  16 urls_sst_2015.txt  >& log &
+# python climatology2.py 10 10 0 pixelMean     fortran multicore  16 urls_sst_2015.txt  >& log &
+# python climatology2.py 5 5 3 gaussInterp   fortran multicore  16 urls_sst_2015.txt  >& log &
+
+
+# Tests:
+# python climatology2.py 5 5 0 pixelMean   fortran sequential 1 urls_sst_daynight_5days_sorted.txt
+# python climatology2.py 5 5 0 pixelMean   fortran multicore  4 urls_sst_daynight_5days_sorted.txt
+# python climatology2.py 5 5 3 gaussInterp fortran sequential 1 urls_sst_daynight_5days_sorted.txt
+# python climatology2.py 5 5 3 gaussInterp fortran multicore  4 urls_sst_daynight_5days_sorted.txt
+# python climatology2.py 5 7 1 spatialFilter fortran sequential 1 urls_sst_daynight_5days_sorted.txt
+
+# Test number of neighbors needed:
+# python climatology2.py 5 7 3 gaussInterp fortran multicore  4 urls_sst_daynight_20days_sorted.txt
+
+
+# Production:
+# python climatology2.py 5 7 0 pixelMean fortran multicore  4 urls_sst_daynight_2003_2015_sorted.txt
+# python climatology2.py 5 7 3 gaussInterp fortran sequential 1 urls_sst_daynight_2003_2015_sorted.txt
+# python climatology2.py 5 7 3 gaussInterp fortran multicore  4 urls_sst_daynight_2003_2015_sorted.txt
+# python climatology2.py 5 7 1 spatialFilter fortran multicore  4 urls_sst_daynight_2003_2015_sorted.txt
+

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/climatology/clim/climatology3Spark.py
----------------------------------------------------------------------
diff --git a/climatology/clim/climatology3Spark.py b/climatology/clim/climatology3Spark.py
new file mode 100755
index 0000000..14a5f59
--- /dev/null
+++ b/climatology/clim/climatology3Spark.py
@@ -0,0 +1,418 @@
+"""
+climatology3Spark.py
+
+Compute a multi-epoch (multi-day) climatology from daily SST Level-3 grids.
+
+Simple code to be run on Spark cluster, or using multi-core parallelism on single machine.
+
+"""
+
+import sys, os, urlparse, urllib, re, time
+import numpy as N
+import matplotlib
+matplotlib.use('Agg')
+import matplotlib.pylab as M
+
+from variables import getVariables, close
+from cache import retrieveFile, CachePath
+from split import fixedSplit, splitByNDays
+from netCDF4 import Dataset, default_fillvals
+from plotlib import imageMap, makeMovie
+
+from spatialFilter import spatialFilter
+from gaussInterp import gaussInterp         # calls into Fortran version gaussInterp_f.so
+#from gaussInterp_slow import gaussInterp_slow as gaussInterp       # pure python, slow debuggable version
+
+VERBOSE = 1
+
+# Possible execution modes
+ExecutionModes = ['multicore', 'spark']
+
+# SST L3m 4.6km Metadata
+# SST calues are scaled integers in degrees Celsius, lon/lat is 8640 x 4320
+# Variable = 'sst', Mask = 'qual_sst', Coordinates = ['lon', 'lat']
+
+# Generate algorithmic name for N-day Climatology product
+SSTClimatologyTemplate = 'SST.L3.Global.Clim.%(period)s.%(date)s.%(version)s.nc'  #??
+
+# Simple mask and average functions to get us started, then add gaussian interpolation.
+# MODIS L3 SST product, qual_sst is [-1, 2] - =0 is best data, can add =1 for better coverage
+#def qcMask(var, mask): return N.ma.array(var, mask=N.ma.make_mask(mask == 0))
+
+def qcMask(var, mask): return N.ma.masked_where(mask != 0, var)
+#def qcMask(var, mask): return N.ma.masked_where(mask < 0, var)
+
+def splitModisSst(seq, n):
+    for chunk in splitByNDays(seq, n, re.compile(r'(...).L3m')):
+        yield chunk
+
+AveragingFunctions = {'pixelMean': mean, 'gaussInterp': gaussInterp, 'spatialFilter': spatialFilter}
+
+PixelMeanConfig = {'name': 'pixelMean'}
+
+GaussInterpConfig = {'name': 'gaussInterp',
+                     'latGrid': None, 'lonGrid': None,         # None means use input lat/lon grid
+                     'wlat': 3, 'wlon': 3,
+                     'slat': 0.15, 'slon': 0.15, 'stime': 1,
+                     'vfactor': -0.6931, 'missingValue': default_fillvals['f4']}
+
+GaussInterpConfig1a = {'name': 'gaussInterp',
+                     'latGrid': None, 'lonGrid': None,         # None means use input lat/lon grid
+                     'wlat': 0.30, 'wlon': 0.30,
+                     'slat': 0.15, 'slon': 0.15, 'stime': 1,
+                     'vfactor': -0.6931, 'missingValue': default_fillvals['f4']}
+
+GaussInterpConfig1b = {'name': 'gaussInterp',
+                     'latGrid': None, 'lonGrid': None,         # None means use input lat/lon grid
+                     'wlat': 0.08, 'wlon': 0.08,
+                     'slat': 0.15, 'slon': 0.15, 'stime': 1,
+                     'vfactor': -0.6931, 'missingValue': default_fillvals['f4']}
+
+GaussInterpConfig2 = {'name': 'gaussInterp',
+                     'latGrid': (89.5, -89.5, -0.25), 'lonGrid': (-180., 179., 0.25),
+                     'wlat': 2., 'wlon': 2.,
+                     'slat': 0.15, 'slon': 0.15, 'stime': 1,
+                     'vfactor': -0.6931, 'missingValue': default_fillvals['f4']}
+
+FilterGaussian = [[1, 2, 1], [2, 4, 2], [1, 2, 1]]    # divide by 16
+FilterLowPass = [[1, 1, 1], [1, 1, 1], [1, 1, 1]]     # divide by 9
+
+SpatialFilterConfig1 = {'name': 'spatialFilter', 'normalization': 16., 
+                        'spatialFilter': N.array(FilterGaussian, dtype=N.int32),
+                        'missingValue': default_fillvals['f4']}
+
+SpatialFilterConfig2 = {'name': 'spatialFilter', 'normalization': 9.,
+                        'spatialFilter': N.array(FilterLowPass, dtype=N.int32),
+                        'missingValue': default_fillvals['f4']}
+
+# Directory to cache retrieved files in
+CachePath = '~/cache'
+
+
+
+def climByAveragingPeriods(urls,              # list of (daily) granule URLs for a long time period (e.g. a year)
+                    nEpochs,                  # compute a climatology for every N epochs (days) by 'averaging'
+                    nWindow,                  # number of epochs in window needed for averaging
+                    variable,                 # name of primary variable in file
+                    mask,                     # name of mask variable
+                    coordinates,              # names of coordinate arrays to read and pass on (e.g. 'lat' and 'lon')
+                    splitFn=splitModisSst,    # split function to use to partition the input URL list
+                    maskFn=qcMask,            # mask function to compute mask from mask variable
+                    averager='pixelMean',     # averaging function to use, one of ['pixelMean', 'gaussInterp', 'spatialFilter']
+                    averagingConfig={},       # dict of parameters to control the averaging function (e.g. gaussInterp)
+                    optimization='fortran',   # optimization mode (fortran or cython)
+                    mode='multicore',         # Map across time periods of N-days for concurrent work, executed by:
+                                              # 'multicore' using pysparkling or 'spark' using Spark/Mesos cluster
+                    numNodes=1,               # number of cluster nodes to use
+                    nWorkers=4,               # number of parallel workers per node
+                    averagingFunctions=AveragingFunctions,    # dict of possible averaging functions
+                    legalModes=ExecutionModes,  # list of possible execution modes
+                    cachePath=CachePath       # directory to cache retrieved files in
+                   ):
+    '''Compute a climatology every N days by applying a mask and averaging function.
+Writes the averaged variable grid, attributes of the primary variable, and the coordinate arrays in a dictionary.
+***Assumption:  This routine assumes that the N grids will fit in memory.***
+    '''
+    if averagingConfig['name'] == 'gaussInterp':
+        averagingConfig['wlat'] = nNeighbors
+        averagingConfig['wlon'] = nNeighbors
+    try:
+        averageFn = averagingFunctions[averager]
+    except:
+        print >>sys.stderr, 'climatology: Error, Averaging function must be one of: %s' % str(averagingFunctions)
+        sys.exit(1)
+
+    urlSplits = [s for s in splitFn(urls, nEpochs)]
+
+    if mode == 'multicore':
+        pass
+    elif mode == 'spark':
+        pass
+
+    def climsContoured(urls, plot=None, fillValue=default_fillvals['f4'], format='NETCDF4', cachePath=cachePath):
+        n = len(urls)
+        if VERBOSE: print >>sys.stderr, urls
+
+        var = climByAveraging(urls, variable, mask, coordinates, maskFn, averageFn, averagingConfig, optimization, cachePath)
+
+        fn = os.path.split(urls[0])[1]
+        inFile = os.path.join(cachePath, fn)
+        method = averagingConfig['name']
+        fn = os.path.splitext(fn)[0]
+        day = fn[5:8]
+        nDays = int(var['time'][0])
+
+        if 'wlat' in averagingConfig:
+            wlat = averagingConfig['wlat']
+        else:
+            wlat = 1
+        if int(wlat) == wlat:
+            outFile = 'A%s.L3m_%dday_clim_sst_4km_%s_%dnbrs.nc' % (day, nDays, method, int(wlat))    # mark each file with first day in period
+        else:
+            outFile = 'A%s.L3m_%dday_clim_sst_4km_%s_%4.2fnbrs.nc' % (day, nDays, method, wlat)    # mark each file with first day in period
+
+        outFile = writeOutNetcdfVars(var, variable, mask, coordinates, inFile, outFile, fillValue, format)
+
+        if plot == 'contour':
+            figFile = contourMap(var, variable, coordinates, n, outFile)
+        elif plot == 'histogram':
+#            figFile = histogram(var, variable, n, outFile)
+            figFile = None
+        else:
+            figFile = None
+        return (outFile, figFile)
+
+
+    return results
+
+
+def accumulateClim(urls,                    # list of granule URLs for a time period
+                   variable,                # name of primary variable in file
+                   mask,                    # name of mask variable
+                   coordinates,             # names of coordinate arrays to read and pass on (e.g. 'lat' and 'lon')
+                   maskFn=qcMask,           # mask function to compute mask from mask variable
+                   averageFn=mean,          # averaging function to use
+                   averagingConfig={},      # parameters to control averaging function (e.g. gaussInterp)
+                   optimization='fortran',  # optimization mode (fortran or cython)
+                   cachePath=CachePath
+                  ):
+    '''Compute a climatology over N arrays by applying a mask and averaging function.
+Returns the averaged variable grid, attributes of the primary variable, and the coordinate arrays in a dictionary.
+***Assumption:  This routine assumes that the N grids will fit in memory.***
+    '''
+    print >>sys.stderr, 'accumulateClim: Doing %s ...' % averagingConfig['name']
+    varList = [variable, mask]
+    for i, url in enumerate(urls):
+        try:
+            path = retrieveFile(url, cachePath)
+            fn = os.path.split(path)[1]
+            vtime = int(fn[5:8])     # KLUDGE: extract DOY from filename
+        except:
+            print >>sys.stderr, 'climByAveraging: Error, continuing without file %s' % url
+            continue
+        if path is None: continue
+        try:
+            print >>sys.stderr, 'Reading file %s ...' % path
+            var, fh = getVariables(path, varList, arrayOnly=True, order='F', set_auto_mask=False)   # return dict of variable objects by name
+        except:
+            print >>sys.stderr, 'climByAveraging: Error, cannot read file %s' % path
+            continue
+        if i == 0:
+            dtype = var[variable].dtype
+            if 'int' in dtype.name: dtype = N.float32
+            shape = var[variable].shape
+            vsum = N.ma.empty(shape, dtype, order='F')
+            vcount = N.ma.empty(shape, dtype, order='F')
+            emptyVar = N.array(N.ma.masked_all(var[variable].shape, dtype), order='F')  # totally masked variable array for missing or bad file reads
+
+            print >>sys.stderr, 'Read coordinates ...'
+            var, fh = getVariables(path, coordinates, var, arrayOnly=True, order='F')   # read coordinate arrays and add to dict
+
+        var[variable] = maskFn(var[variable], var[mask])     # apply quality mask variable to get numpy MA, turned off masking done by netCDF4 library
+
+        # Echo variable range for sanity check
+        vals = var[variable].compressed()
+        print >>sys.stderr, 'Variable Range: min, max:', vals.min(), vals.max()
+
+        if averagingConfig['name'] == 'pixelMean':
+            vsum += var[variable]                        # update accumulators
+            vcount += ~var[mask]
+
+        elif averagingConfig['name'] == 'gaussInterp':
+            var[variable] = accum
+            c = averagingConfig
+            latGrid = c['latGrid']; lonGrid = c['lonGrid']
+            if latGrid is not None and lonGrid is not None:
+                outlat = N.arange(latGrid[0], latGrid[1]+latGrid[2], latGrid[2], dtype=N.float32, order='F')
+                outlon = N.arange(lonGrid[0], lonGrid[1]+lonGrid[2], lonGrid[2], dtype=N.float32, order='F')
+            else:
+                outlat = N.array(var[coordinates[1]], dtype=N.float32, order='F')
+                outlon = N.array(var[coordinates[2]], dtype=N.float32, order='F')
+            varNames = [variable] + coordinates
+            start = time.time()
+            avg, weight, status = \
+                gaussInterp(var, varNames, outlat, outlon, c['wlat'], c['wlon'],
+                            c['slat'], c['slon'], c['stime'], c['vfactor'], c['missingValue'],
+                            VERBOSE, optimization)
+            end = time.time()
+            vcount = weight.astype(N.float32)
+            vsum = avg
+            print >>sys.stderr, 'gaussInterp execution time:', (end - start)
+        elif averagingConfig['name'] == 'spatialFilter':
+            var[variable] = accum
+            c = averagingConfig
+            varNames = [variable] + coordinates
+            start = time.time()
+            avg, vcount, status = \
+                spatialFilter(var, varNames, c['spatialFilter'], c['normalization'], 
+                              c['missingValue'], VERBOSE, optimization)
+            vsum = avg
+            end = time.time()
+            print >>sys.stderr, 'spatialFilter execution time:', (end - start)
+        close(fh)
+
+    return (vcount, vsum)
+
+
+def writeOutNetcdfVars(var, variable, mask, coordinates, inFile, outFile, fillValue=None, format='NETCDF4'):
+    '''Construct output bundle of arrays with NetCDF dimensions and attributes.
+Output variables and attributes will have same names as the input file.
+    '''
+    din = Dataset(inFile, 'r')
+    dout = Dataset(outFile, 'w', format=format)
+    print >>sys.stderr, 'Writing %s ...' % outFile
+
+    # Transfer global attributes from input file
+    for a in din.ncattrs():
+        dout.setncattr(a, din.getncattr(a))
+
+    # Add dimensions and variables, copying data
+    coordDim = [dout.createDimension(coord, var['out'+coord].shape[0]) for coord in coordinates]    # here output lon, lat, etc.
+    for coord in coordinates:
+        v = dout.createVariable(coord, var['out'+coord].dtype, (coord,))
+        v[:] = var['out'+coord][:]
+    primaryVar = dout.createVariable(variable, var['out'+variable].dtype, coordinates, fill_value=fillValue)
+    primaryVar[:] = var['out'+variable][:]          # transfer array
+    maskVar = dout.createVariable(mask, 'i1', coordinates)
+    maskVar[:] = var['out'+mask].astype('i1')[:]
+
+    # Transfer variable attributes from input file
+    for k,v in dout.variables.iteritems():
+        for a in din.variables[k].ncattrs():
+            if a == 'scale_factor' or a == 'add_offset' or a == '_FillValue': continue
+            v.setncattr(a, din.variables[k].getncattr(a))
+        if k == variable:
+            try:
+#                if fillValue == None: fillValue = din.variables[k].getncattr('_FillValue')        # total kludge
+                if fillValue == None: fillValue = default_fillvals['f4']
+#                print >>sys.stderr, default_fillvals
+#                v.setncattr('_FillValue', fillValue)         # set proper _FillValue for climatology array
+                v.setncattr('missing_value', fillValue)
+                print >>sys.stderr, 'Setting missing_value for primary variable %s to %f' % (variable, fillValue)
+            except:
+                print >>sys.stderr, 'writeOutNetcdfVars: Warning, for variable %s no fill value specified or derivable from inputs.' % variable
+    din.close()
+    dout.close()
+    return outFile
+    
+
+def contourMap(var, variable, coordinates, n, outFile):
+    figFile = os.path.splitext(outFile)[0] + '_hist.png'
+    # TODO: Downscale variable array (SST) before contouring, matplotlib is TOO slow on large arrays
+    vals = var[variable][:]
+
+    # Fixed color scale, write file, turn off auto borders, set title, reverse lat direction so monotonically increasing??
+    imageMap(var[coordinates[1]][:], var[coordinates[0]][:], var[variable][:],
+             vmin=-2., vmax=45., outFile=figFile, autoBorders=False,
+             title='%s %d-day Mean from %s' % (variable.upper(), n, outFile))
+    print >>sys.stderr, 'Writing contour plot to %s' % figFile
+    return figFile
+
+
+def histogram(vals, variable, n, outFile):
+    figFile = os.path.splitext(outFile)[0] + '_hist.png'
+    M.clf()
+#    M.hist(vals, 47, (-2., 45.))
+    M.hist(vals, 94)
+    M.xlim(-5, 45)
+    M.xlabel('SST in degrees Celsius')
+    M.ylim(0, 300000)
+    M.ylabel('Count')
+    M.title('Histogram of %s %d-day Mean from %s' % (variable.upper(), n, outFile))
+    M.show()
+    print >>sys.stderr, 'Writing histogram plot to %s' % figFile
+    M.savefig(figFile)
+    return figFile
+
+
+def dailyFile2date(path, offset=1):
+    '''Convert YYYYDOY string in filename to date.'''
+    fn = os.path.split(path)[1]
+    year = int(fn[offset:offset+4])
+    doy = int(fn[offset+5:offset+8])
+    return fn[5:15].replace('.', '/')
+
+
+def formatRegion(r):
+    """Format lat/lon region specifier as string suitable for file name."""
+    if isinstance(r, str):
+        return r
+    else:
+        strs = [str(i).replace('-', 'm') for i in r]
+        return 'region-%s-%sby%s-%s' % tuple(strs)
+
+
+def formatGrid(r):
+    """Format lat/lon grid resolution specifier as string suitable for file name."""
+    if isinstance(r, str):
+        return r
+    else:
+        return str(r[0]) + 'by' + str(r[1])
+
+
+def main(args):
+    nEpochs = int(args[0])
+    nWindow = int(args[1])
+    nNeighbors = int(args[2])
+    averager = args[3]
+    optimization = args[4]
+    mode = args[5]
+    nWorkers = int(args[6])
+    urlFile = args[7]
+    urls = [s.strip() for s in open(urlFile, 'r')]
+    if averager == 'gaussInterp':
+        results = climByAveragingPeriods(urls, nEpochs, nWindow, nNeighbors, 'sst', 'qual_sst', ['lat', 'lon'],
+                             averager=averager, optimization=optimization, averagingConfig=GaussInterpConfig,
+                             mode=mode, nWorkers=nWorkers)
+    elif averager == 'spatialFilter':
+        results = climByAveragingPeriods(urls, nEpochs, nWindow, nNeighbors, 'sst', 'qual_sst', ['lat', 'lon'],
+                             averager=averager, optimization=optimization, averagingConfig=SpatialFilterConfig1,
+                             mode=mode, nWorkers=nWorkers)
+    elif averager == 'pixelMean':
+        results = climByAveragingPeriods(urls, nEpochs, nWindow, nNeighbors, 'sst', 'qual_sst', ['lat', 'lon'],
+                             averager=averager, optimization=optimization, averagingConfig=PixelMeanConfig,
+                             mode=mode, nWorkers=nWorkers)
+    else:
+        print >>sys.stderr, 'climatology2: Error, averager must be one of', AveragingFunctions.keys()
+        sys.exit(1)
+    
+    if results[0][1] is not None:
+        makeMovie([r[1] for r in results], 'clim.mpg')    
+    return results
+
+    
+if __name__ == '__main__':
+    print main(sys.argv[1:])
+
+
+# Old Tests:
+# python climatology2.py 5 5 0 pixelMean   fortran sequential 1 urls_sst_10days.txt
+# python climatology2.py 5 5 3 gaussInterp fortran sequential 1 urls_sst_10days.txt
+
+# python climatology2.py 5 5 0 pixelMean   fortran sequential 1 urls_sst_40days.txt
+# python climatology2.py 5 5 0 pixelMean   fortran multicore  8 urls_sst_40days.txt
+# python climatology2.py 5 5 3 gaussInterp fortran multicore  8 urls_sst_40days.txt
+
+# Old Production:
+# python climatology2.py 5 5 0 pixelMean     fortran multicore  16 urls_sst_2015.txt  >& log &
+# python climatology2.py 10 10 0 pixelMean     fortran multicore  16 urls_sst_2015.txt  >& log &
+# python climatology2.py 5 5 3 gaussInterp   fortran multicore  16 urls_sst_2015.txt  >& log &
+
+
+# Tests:
+# python climatology2.py 5 5 0 pixelMean   fortran sequential 1 urls_sst_daynight_5days_sorted.txt
+# python climatology2.py 5 5 0 pixelMean   fortran multicore  4 urls_sst_daynight_5days_sorted.txt
+# python climatology2.py 5 5 3 gaussInterp fortran sequential 1 urls_sst_daynight_5days_sorted.txt
+# python climatology2.py 5 5 3 gaussInterp fortran multicore  4 urls_sst_daynight_5days_sorted.txt
+# python climatology2.py 5 7 1 spatialFilter fortran sequential 1 urls_sst_daynight_5days_sorted.txt
+
+# Test number of neighbors needed:
+# python climatology2.py 5 7 3 gaussInterp fortran multicore  4 urls_sst_daynight_20days_sorted.txt
+
+
+# Production:
+# python climatology2.py 5 7 0 pixelMean fortran multicore  4 urls_sst_daynight_2003_2015_sorted.txt
+# python climatology2.py 5 7 3 gaussInterp fortran sequential 1 urls_sst_daynight_2003_2015_sorted.txt
+# python climatology2.py 5 7 3 gaussInterp fortran multicore  4 urls_sst_daynight_2003_2015_sorted.txt
+# python climatology2.py 5 7 1 spatialFilter fortran multicore  4 urls_sst_daynight_2003_2015_sorted.txt
+

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/climatology/clim/cluster.py
----------------------------------------------------------------------
diff --git a/climatology/clim/cluster.py b/climatology/clim/cluster.py
new file mode 100644
index 0000000..7a76ad9
--- /dev/null
+++ b/climatology/clim/cluster.py
@@ -0,0 +1,84 @@
+#
+# cluster.py -- Parallel map function that uses a distributed cluster and multicore within each node
+#
+
+import os, sys
+from split import fixedSplit
+import pp
+#import dispy
+
+Servers = ("server-1", "server-2", "server-3", "server-4")
+NCores = 8
+
+
+def splitSeq(seq, n):
+    '''Split a sequence into N almost even parts.'''
+    m = int(len(seq)/n)
+    return fixedSplit(seq, m)
+
+
+def dmap(func,                  # function to parallel map across split sequence
+         seq,                   # sequence of data
+         nCores,                # number of cores to use on each node
+         servers=[],            # list of servers in the cluster
+         splitterFn=splitSeq,   # function to use to split the sequence of data
+        ): 
+    '''Parallel operations for a cluster of nodes, each with multiple cores.
+    '''
+    nServers = len(servers)
+    if nServers == 0: servers = ('localhost',)
+    jobServer = pp.Server(nCores, ppservers=servers)
+    splits = splitSeq(seq, nServers * nCores)
+    jobs = [(s, jobServer.submit(func, (s,), ('splitSeq',), globals=globals())) for s in splits]
+    results = [(j[0], j[1]()) for j in jobs]
+    return results
+
+
+class PPCluster:
+    '''Parallel operations for a cluster of nodes, each with multiple cores.
+    '''
+    def __init__(self, nCores=NCores,         # number of cores to use on each node
+                       servers=Servers,       # list of servers in the cluster
+                       splitterFn=splitSeq,   # function to use to split the sequence of data
+                ):
+        self.nCores = nCores
+        self.servers = servers
+        self.nServers = len(servers)
+        self.splitterFn = splitterFn
+        self.jobServer = pp.Server(nCores, ppservers=servers)
+
+    def dmap(self, func, seq):
+        '''Distributed map function that automatically uses a cluster of nodes and a set number of cores.
+A sequence of data is split across the cluster.
+        '''
+        splits = splitSeq(seq, self.nServers * self.nCores)
+        jobs = [(s, self.jobServer.submit(func, (s,), (splitterFn,), globals=globals())) for s in splits]
+        results = [(j[0], j[1]()) for j in jobs]
+        return results
+
+
+# Tests follow.
+
+def extractDay(url, substring):
+    '''Extract integer DOY from filename like urls.'''
+    i = url.find(substring)
+    return int(url[5:8])
+
+
+def main(args):
+    nCores = int(args[0])
+    servers = tuple(eval(args[1]))
+    urlFile = args[2]
+    urls = [s.strip() for s in open(urlFile, 'r')]
+
+#    results = PPCluster(nCores, servers).dmap(lambda u: extractDOY(u, 'A2015'), urls)
+    results = dmap(lambda u: extractDOY(u, 'A2015'), urls, nCores, servers, splitSeq)
+
+
+if __name__ == '__main__':
+    print main(sys.argv[1:])
+
+# python cluster.py 8 '["deepdata-1"]' urls_sst_2015.txt
+# python cluster.py 1 '["deepdata-1", "deepdata-2", "deepdata-3", "deepdata-4"]' urls_sst_2015.txt
+# python cluster.py 8 '["deepdata-1", "deepdata-2", "deepdata-3", "deepdata-4"]' urls_sst_2015.txt
+

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/climatology/clim/cluster2.py
----------------------------------------------------------------------
diff --git a/climatology/clim/cluster2.py b/climatology/clim/cluster2.py
new file mode 100644
index 0000000..7a76ad9
--- /dev/null
+++ b/climatology/clim/cluster2.py
@@ -0,0 +1,84 @@
+#
+# cluster.py -- Parallel map function that uses a distributed cluster and multicore within each node
+#
+
+import os, sys
+from split import fixedSplit
+import pp
+#import dispy
+
+Servers = ("server-1", "server-2", "server-3", "server-4")
+NCores = 8
+
+
+def splitSeq(seq, n):
+    '''Split a sequence into N almost even parts.'''
+    m = int(len(seq)/n)
+    return fixedSplit(seq, m)
+
+
+def dmap(func,                  # function to parallel map across split sequence
+         seq,                   # sequence of data
+         nCores,                # number of cores to use on each node
+         servers=[],            # list of servers in the cluster
+         splitterFn=splitSeq,   # function to use to split the sequence of data
+        ): 
+    '''Parallel operations for a cluster of nodes, each with multiple cores.
+    '''
+    nServers = len(servers)
+    if nServers == 0: servers = ('localhost',)
+    jobServer = pp.Server(nCores, ppservers=servers)
+    splits = splitSeq(seq, nServers * nCores)
+    jobs = [(s, jobServer.submit(func, (s,), ('splitSeq',), globals=globals())) for s in splits]
+    results = [(j[0], j[1]()) for j in jobs]
+    return results
+
+
+class PPCluster:
+    '''Parallel operations for a cluster of nodes, each with multiple cores.
+    '''
+    def __init__(self, nCores=NCores,         # number of cores to use on each node
+                       servers=Servers,       # list of servers in the cluster
+                       splitterFn=splitSeq,   # function to use to split the sequence of data
+                ):
+        self.nCores = nCores
+        self.servers = servers
+        self.nServers = len(servers)
+        self.splitterFn = splitterFn
+        self.jobServer = pp.Server(nCores, ppservers=servers)
+
+    def dmap(self, func, seq):
+        '''Distributed map function that automatically uses a cluster of nodes and a set number of cores.
+A sequence of data is split across the cluster.
+        '''
+        splits = splitSeq(seq, self.nServers * self.nCores)
+        jobs = [(s, self.jobServer.submit(func, (s,), (splitterFn,), globals=globals())) for s in splits]
+        results = [(j[0], j[1]()) for j in jobs]
+        return results
+
+
+# Tests follow.
+
+def extractDay(url, substring):
+    '''Extract integer DOY from filename like urls.'''
+    i = url.find(substring)
+    return int(url[5:8])
+
+
+def main(args):
+    nCores = int(args[0])
+    servers = tuple(eval(args[1]))
+    urlFile = args[2]
+    urls = [s.strip() for s in open(urlFile, 'r')]
+
+#    results = PPCluster(nCores, servers).dmap(lambda u: extractDOY(u, 'A2015'), urls)
+    results = dmap(lambda u: extractDOY(u, 'A2015'), urls, nCores, servers, splitSeq)
+
+
+if __name__ == '__main__':
+    print main(sys.argv[1:])
+
+# python cluster.py 8 '["deepdata-1"]' urls_sst_2015.txt
+# python cluster.py 1 '["deepdata-1", "deepdata-2", "deepdata-3", "deepdata-4"]' urls_sst_2015.txt
+# python cluster.py 8 '["deepdata-1", "deepdata-2", "deepdata-3", "deepdata-4"]' urls_sst_2015.txt
+

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/climatology/clim/datasets.py
----------------------------------------------------------------------
diff --git a/climatology/clim/datasets.py b/climatology/clim/datasets.py
new file mode 100644
index 0000000..226935b
--- /dev/null
+++ b/climatology/clim/datasets.py
@@ -0,0 +1,317 @@
+'''
+datasets.py -- Routines for dataset-specfic capabilities:  file handling, readers, etc.
+
+One Class for each dataset containing static methods and constants/templates, etc.
+
+'''
+
+import sys, os, re, datetime
+
+import numpy as np
+
+from split import splitByNDaysKeyed, groupByKeys, extractKeys
+
+
+def splitModisAod(seq, n):
+    return splitByNDaysKeyed(seq, n, re.compile(r'(....)(..)(..)'), lambda y, m, d: ymd2doy(y, m, d))
+
+
+def splitAvhrrSst(seq, n):
+    return splitByNDays_Avhrr(seq, n, re.compile(r'^(....)(..)(..)'))
+
+
+class ModisSst:
+    ExpectedRunTime = "28m"
+    UrlsPath = "/data/share/datasets/MODIS_L3_AQUA_11UM_V2014.0_4KM_DAILY/daily_data/A*SST*.nc"
+    ExampleFileName = 'A2010303.L3m_DAY_NSST_sst_4km.nc'
+    GetKeysRegex = r'A(....)(...).L3m_DAY_(.)S'
+
+    VariableName = 'sst'
+    Mask = None
+    Coordinates = ['lat', 'lon']
+
+    OutputClimTemplate = ''
+
+    @staticmethod
+    def keysTransformer(s): return (s[1], s[0], s[2])  # DOY, YEAR, N=night / S=day
+
+    @staticmethod
+    def getKeys(url):
+        return extractKeys(url, ModisSst.GetKeysRegex, ModisSst.keysTransformer)
+
+    @staticmethod
+    def split(seq, n):
+        return [u for u in splitByNDaysKeyed(seq, n, ModisSst.GetKeysRegex, ModisSst.keysTransformer)]
+
+    @staticmethod
+    def genOutputName(doy, variable, nEpochs, averagingConfig):
+        return 'A%03d.L3m_%s_%dday_clim_%s.nc' % (
+            doy, variable, nEpochs, averagingConfig['name'])  # mark each file with first day in period
+
+
+class ModisChlor:
+    ExpectedRunTime = "11m"
+    UrlsPath = "/Users/greguska/githubprojects/nexus/nexus-ingest/developer-box/data/modis_aqua_chl/A*chlor*.nc"
+    ExampleFileName = "A2013187.L3m_DAY_CHL_chlor_a_4km.nc"
+    GetKeysRegex = r'A(....)(...).L3m.*CHL'
+
+    Variable = 'chlor_a'
+    Mask = None
+    Coordinates = ['lat', 'lon']
+
+    OutputClimTemplate = ''
+
+    @staticmethod
+    def keysTransformer(s): return (s[1], s[0])  # DOY, YEAR
+
+    @staticmethod
+    def getKeys(url):
+        return extractKeys(url, ModisChlor.GetKeysRegex, ModisChlor.keysTransformer)
+
+    @staticmethod
+    def split(seq, n):
+        return [u for u in splitByNDaysKeyed(seq, n, ModisChlor.GetKeysRegex, ModisChlor.keysTransformer)]
+
+    @staticmethod
+    def genOutputName(doy, variable, nEpochs, averagingConfig):
+        return 'A%03d.L3m_%s_%dday_clim_%s.nc' % (
+            doy, variable, nEpochs, averagingConfig['name'])  # mark each file with first day in period
+
+
+class MeasuresSsh:
+    ExpectedRunTime = "2m22s"
+    UrlsPath = "/data/share/datasets/MEASURES_SLA_JPL_1603/daily_data/ssh_grids_v1609*12.nc"
+    ExampleFileName = "ssh_grids_v1609_2006120812.nc"
+    GetKeysRegex = r'ssh.*v1609_(....)(..)(..)12.nc'
+
+    Variable = 'SLA'  # sea level anomaly estimate
+    Mask = None
+    Coordinates = ['Longitude', 'Latitude']  # Time is first (len=1) coordinate, will be removed
+
+    OutputClimTemplate = ''
+
+    @staticmethod
+    def keysTransformer(s): return (ymd2doy(s[0], s[1], s[2]), s[0])  # DOY, YEAR
+
+    @staticmethod
+    def getKeys(url):
+        return extractKeys(url, MeasuresSsh.GetKeysRegex, MeasuresSsh.keysTransformer)
+
+    @staticmethod
+    def split(seq, n):
+        return [u for u in splitByNDaysKeyed(seq, n, MeasuresSsh.GetKeysRegex, MeasuresSsh.keysTransformer)]
+
+    @staticmethod
+    def genOutputName(doy, variable, nEpochs, averagingConfig):
+        return "ssh_grids_v1609_%03d_%dday_clim_%s.nc" % (int(doy), nEpochs, averagingConfig['name'])
+
+
+class CCMPWind:
+    ExpectedRunTime = "?"
+    UrlsPath = "/data/share/datasets/CCMP_V2.0_L3.0/daily_data/CCMP_Wind*_V02.0_L3.0_RSS_uncompressed.nc"
+    ExampleFileName = "CCMP_Wind_Analysis_20160522_V02.0_L3.0_RSS_uncompressed.nc"
+    GetKeysRegex = r'CCMP_Wind_Analysis_(....)(..)(..)_V.*.nc'
+
+    Variable = 'Wind_Magnitude'  # to be computed as sqrt(uwnd^2 + vwnd^2)
+    Mask = None
+    Coordinates = ['latitude', 'longitude']
+
+    OutputClimTemplate = ''
+
+    @staticmethod
+    def keysTransformer(s):
+        return (ymd2doy(s[0], s[1], s[2]), s[0])  # DOY, YEAR
+
+    @staticmethod
+    def getKeys(url):
+        return extractKeys(url, CCMPWind.GetKeysRegex, CCMPWind.keysTransformer)
+
+    @staticmethod
+    def split(seq, n):
+        return [u for u in splitByNDaysKeyed(seq, n, CCMPWind.GetKeysRegex, CCMPWind.keysTransformer)]
+
+    @staticmethod
+    def genOutputName(doy, variable, nEpochs, averagingConfig):
+        return "CCMP_Wind_Analysis_V02.0_L3.0_RSS_%03d_%dday_clim_%s.nc" % (int(doy), nEpochs, averagingConfig['name'])
+
+    @staticmethod
+    def readAndMask(url, variable, mask=None, cachePath='/tmp/cache', hdfsPath=None):
+        """
+        Read a variable from a netCDF or HDF file and return a numpy masked array.
+        If the URL is remote or HDFS, first retrieve the file into a cache directory.
+        """
+        from variables import getVariables, close
+        v = None
+        if mask:
+            variables = [variable, mask]
+        else:
+            variables = [variable]
+        try:
+            from cache import retrieveFile
+            path = retrieveFile(url, cachePath, hdfsPath)
+        except:
+            print >> sys.stderr, 'readAndMask: Error, continuing without file %s' % url
+            return v
+
+        if CCMPWind.Variable in variables:
+            var, fh = getVariables(path, ['uwnd','vwnd'], arrayOnly=True,
+                                   set_auto_mask=True)  # return dict of variable objects by name
+            uwnd_avg = np.average(var['uwnd'], axis=0)
+            vwnd_avg = np.average(var['vwnd'], axis=0)
+            wind_magnitude = np.sqrt(np.add(np.multiply(uwnd_avg, uwnd_avg), np.multiply(vwnd_avg, vwnd_avg)))
+            v = wind_magnitude
+            if v.shape[0] == 1: v = v[0]  # throw away trivial time dimension for CF-style files
+            close(fh)
+        else:
+            try:
+                print >> sys.stderr, 'Reading variable %s from %s' % (variable, path)
+                var, fh = getVariables(path, variables, arrayOnly=True,
+                                       set_auto_mask=True)  # return dict of variable objects by name
+                v = var[
+                    variable]  # could be masked array
+                if v.shape[0] == 1: v = v[0]  # throw away trivial time dimension for CF-style files
+                close(fh)
+            except:
+                print >> sys.stderr, 'readAndMask: Error, cannot read variable %s from file %s' % (variable, path)
+
+        return v
+
+class MonthlyClimDataset:
+    ExpectedRunTime = "2m"
+    UrlsPath = ''
+    ExampleFileName = ''
+    GetKeysRegex = r'(YYYY)(MM)(DD)' # Regex to extract year, month, day
+    Variable = 'var' # Variable name in granule
+    Mask = None
+    Coordinates = ['lat', 'lon']
+    OutputClimTemplate = ''
+
+    @staticmethod
+    def keysTransformer(s): 
+        return (s[1],s[0]) # MONTH, YEAR
+
+    @staticmethod
+    def getKeys(url):
+        return extractKeys(url, MonthlyClimDataset.GetKeysRegex, 
+                           MonthlyClimDataset.keysTransformer)
+
+    @staticmethod
+    def split(seq, n):
+        return [u for u in splitByNDaysKeyed(seq, n, 
+                                             MonthlyClimDataset.GetKeysRegex, 
+                                             MonthlyClimDataset.keysTransformer)]
+
+    @staticmethod
+    def genOutputName(month, variable, nEpochs, averagingConfig):
+        # Here we use the 15th of the month to get DOY and just use any
+        # non-leap year.
+        doy = datetime2doy(ymd2datetime(2017, month, 15))
+        return 'monthly_clim_%s_%03d_month%02d_nepochs%d_%s.nc' % (
+            variable, doy, month, nEpochs, 
+            averagingConfig['name'])  # mark each file with month
+
+
+class SMAP_L3M_SSS(MonthlyClimDataset):
+    UrlsPath = "/data/share/datasets/SMAP_L3_SSS/monthly/RSS_smap_SSS_monthly_*.nc"
+    ExampleFileName = 'RSS_smap_SSS_monthly_2015_04_v02.0.nc'
+    GetKeysRegex = r'RSS_smap_SSS_monthly_(....)_(..)_v02'
+    Variable = 'sss_smap'
+
+    @staticmethod
+    def getKeys(url):
+        return extractKeys(url, SMAP_L3M_SSS.GetKeysRegex, 
+                           SMAP_L3M_SSS.keysTransformer)
+
+    @staticmethod
+    def split(seq, n):
+        return [u for u in splitByNDaysKeyed(seq, n, 
+                                             SMAP_L3M_SSS.GetKeysRegex, 
+                                             SMAP_L3M_SSS.keysTransformer)]
+
+    @staticmethod
+    def genOutputName(month, variable, nEpochs, averagingConfig):
+        # Here we use the 15th of the month to get DOY and just use any
+        # non-leap year.
+        doy = datetime2doy(ymd2datetime(2017, month, 15))
+        return '%s_L3m_clim_doy%03d_month%02d_nepochs%d_%s.nc' % (
+            variable, doy, month, nEpochs, 
+            averagingConfig['name'])  # mark each file with month
+
+class GRACE_Tellus(MonthlyClimDataset):
+    GetKeysRegex = r'GRCTellus.JPL.(....)(..)(..).GLO'
+    Variable = 'lwe_thickness' # Liquid_Water_Equivalent_Thickness
+
+    @staticmethod
+    def getKeys(url):
+        return extractKeys(url, GRACE_Tellus.GetKeysRegex, 
+                           GRACE_Tellus.keysTransformer)
+
+    @staticmethod
+    def split(seq, n):
+        return [u for u in splitByNDaysKeyed(seq, n, 
+                                             GRACE_Tellus.GetKeysRegex, 
+                                             GRACE_Tellus.keysTransformer)]
+
+    @staticmethod
+    def genOutputName(month, variable, nEpochs, averagingConfig):
+        # Here we use the 15th of the month to get DOY and just use any
+        # non-leap year.
+        doy = datetime2doy(ymd2datetime(2017, month, 15))
+        return 'GRACE_Tellus_monthly_%s_%03d_month%02d_nepochs%d_%s.nc' % (
+            variable, doy, month, nEpochs, 
+            averagingConfig['name'])  # mark each file with month
+
+class GRACE_Tellus_monthly_land(GRACE_Tellus):
+    UrlsPath = "/data/share/datasets/GRACE_Tellus/monthly_land/GRCTellus.JPL.*.nc"
+    ExampleFileName = "GRCTellus.JPL.20150122.GLO.RL05M_1.MSCNv02CRIv02.land.nc"
+
+    @staticmethod
+    def genOutputName(month, variable, nEpochs, averagingConfig):
+        # Here we use the 15th of the month to get DOY and just use any
+        # non-leap year.
+        doy = datetime2doy(ymd2datetime(2017, month, 15))
+        return 'GRACE_Tellus_monthly_land_%s_%03d_month%02d_nepochs%d_%s.nc' % (
+            variable, doy, month, nEpochs, 
+            averagingConfig['name'])  # mark each file with month
+
+class GRACE_Tellus_monthly_ocean(GRACE_Tellus):
+    UrlsPath = "/data/share/datasets/GRACE_Tellus/monthly_ocean/GRCTellus.JPL.*.nc"
+    ExampleFileName = "GRCTellus.JPL.20150122.GLO.RL05M_1.MSCNv02CRIv02.ocean.nc"
+
+    @staticmethod
+    def genOutputName(month, variable, nEpochs, averagingConfig):
+        # Here we use the 15th of the month to get DOY and just use any
+        # non-leap year.
+        doy = datetime2doy(ymd2datetime(2017, month, 15))
+        return 'GRACE_Tellus_monthly_ocean_%s_%03d_month%02d_nepochs%d_%s.nc'%(
+            variable, doy, month, nEpochs, 
+            averagingConfig['name'])  # mark each file with month
+
+
+DatasetList = {'ModisSst': ModisSst, 'ModisChlor': ModisChlor,
+               'MeasuresSsh': MeasuresSsh, 'CCMPWind': CCMPWind,
+               'SMAP_L3M_SSS': SMAP_L3M_SSS, 
+               'GRACE_Tellus_monthly_ocean': GRACE_Tellus_monthly_ocean,
+               'GRACE_Tellus_monthly_land': GRACE_Tellus_monthly_land}
+
+
+# Utils follow.
+def ymd2doy(year, mon, day):
+    return datetime2doy(ymd2datetime(year, mon, day))
+
+
+def ymd2datetime(y, m, d):
+    y, m, d = map(int, (y, m, d))
+    return datetime.datetime(y, m, d)
+
+
+def datetime2doy(dt):
+    return int(dt.strftime('%j'))
+
+
+def doy2datetime(year, doy):
+    '''Convert year and DOY (day of year) to datetime object.'''
+    return datetime.datetime(int(year), 1, 1) + datetime.timedelta(int(doy) - 1)
+
+
+def doy2month(year, doy): return doy2datetime(year, doy).strftime('%m')

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/climatology/clim/dparkTest.py
----------------------------------------------------------------------
diff --git a/climatology/clim/dparkTest.py b/climatology/clim/dparkTest.py
new file mode 100644
index 0000000..be92cbd
--- /dev/null
+++ b/climatology/clim/dparkTest.py
@@ -0,0 +1,9 @@
+
+import dpark
+
+lines = dpark.textFile("/usr/share/dict/words", 128)
+#lines = dpark.textFile("/usr/share/doc/gcc-4.4.7/README.Portability", 128)
+words = lines.flatMap(lambda x:x.split()).map(lambda x:(x,1))
+wc = words.reduceByKey(lambda x,y:x+y).collectAsMap()
+print wc[wc.keys[0]]
+

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/climatology/clim/gaussInterp.py
----------------------------------------------------------------------
diff --git a/climatology/clim/gaussInterp.py b/climatology/clim/gaussInterp.py
new file mode 100644
index 0000000..f694d7a
--- /dev/null
+++ b/climatology/clim/gaussInterp.py
@@ -0,0 +1,43 @@
+#
+# gaussInterp routine -- Gaussian weighted smoothing in lat, lon, and time
+#
+# Based on Ed Armstrong's routines.
+#
+# Calls into optimized routines in Fortran or cython.
+# See gaussInterp_f.f  or gaussInterp.pyx
+#
+
+import numpy as N, time
+from gaussInterp_f import gaussinterp_f
+
+
+def gaussInterp(var,                      # bundle of input arrays: masked variable, coordinates
+                varNames,                 # list of names in order: primary variable, coordinates in order lat, lon, time
+                outlat, outlon,           # output lat/lon coordinate vectors
+                wlat, wlon,               # window of lat/lon neighbors to gaussian weight, expressed in delta lat (degrees)
+                slat, slon, stime,        # sigma for gaussian downweighting with distance in lat, lon (deg), & time (days)
+                vfactor=-0.6931,          # factor in front of gaussian expression
+                missingValue=-9999.,      # value to mark missing values in interp result
+                verbose=1,                # integer to set verbosity level
+                optimization='fortran'):  # Mode of optimization, using 'fortran' or 'cython'
+    '''Gaussian interpolate in lat, lon, and time to a different lat/lon grid, and over a time window to the center time.
+Bundle of arrays (var) contains a 3D masked variable and coordinate arrays for lat, lon, and time read from netdf/hdf files.
+Returns the 2D interpolated variable (masked) and a status for failures. 
+    '''
+    v = var[varNames[0]][:]                     # real*4 in Fortran code, is v.dtype correct?
+    vmask = N.ma.getmask(v).astype('int8')[:]   # integer*1, convert bool mask to one-byte integer for Fortran
+    vtime  = var[varNames[1]][:]                # integer*4 in Fortran
+    lat = var[varNames[2]][:]                   # real*4
+    lon = var[varNames[3]][:]                   # real*4
+    if optimization == 'fortran':
+        vinterp, vweight, status = \
+             gaussinterp_f(v, vmask, vtime, lat, lon,
+                           outlat, outlon, wlat, wlon, slat, slon, stime, vfactor, missingValue, verbose)
+    else:
+        vinterp, vweight, status = \
+             gaussInterp_(v, vmask, vtime, lat, lon,
+                          outlat, outlon, wlat, wlon, slat, slon, stime, vfactor, missingValue, verbose)
+
+    vinterp = N.ma.masked_where(vweight == 0.0, vinterp)
+    return (vinterp, vweight, status)
+

http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/climatology/clim/gaussInterp.pyx
----------------------------------------------------------------------
diff --git a/climatology/clim/gaussInterp.pyx b/climatology/clim/gaussInterp.pyx
new file mode 100644
index 0000000..943e298
--- /dev/null
+++ b/climatology/clim/gaussInterp.pyx
@@ -0,0 +1,131 @@
+c
+c     gaussInterp routine -- Gaussian weighted smoothing in lat, lon, and time
+c
+c Based on Ed Armstrong's routines. Designed to be called from python using f2py.
+c
+c
+c Gaussian weighting = exp( vfactor * (((x - x0)/sx)^2 + ((y - y0)/sy)^2 + ((t - t0)/st)^2 ))
+c
+c where deltas are distances in lat, lon and time and sx, sy, st are one e-folding sigmas.
+c
+c Cutoffs for neighbors allowed in the interpolation are set by distance in lat/lon (see dlat/dlon);
+c for time all epochs are included.
+c
+
+import sys
+import numpy as np
+cimport numpy as np
+from math import exp
+
+from gaussInterp_f import gaussInterp_f
+
+Var_t = np.float
+ctypedef np.float_t Var_t
+
+VERBOSE = 1
+
+
+def gaussInterp(var,                     # bundle of input arrays: masked variable, coordinates
+                varNames,                # list of names in order: primary variable, coordinates in order lat, lon, time
+                outlat, outlon,          # output lat/lon coordinate vectors
+                wlat, wlon,              # window of lat/lon neighbors to gaussian weight, expressed in delta lat (degrees)
+                slat, slon, stime,       # sigma for gaussian downweighting with distance in lat, lon (deg), & time (days)
+                vfactor=-0.6931,         # factor in front of gaussian expression
+                missingValue=-9999.,     # value to mark missing values in interp result
+                verbose=VERBOSE,         # integer to set verbosity level
+                optimization='cython'):  # Mode of optimization, using 'fortran' or 'cython'
+    '''Gaussian interpolate in lat, lon, and time to a different lat/lon grid, and over a time window to the center time.
+Bundle of arrays (var) contains a 3D masked variable and coordinate arrays for lat, lon, and time read from netdf/hdf files.
+Returns the 2D interpolated variable (masked) and a status for failures. 
+    '''
+    v = var[varNames[0]][:]
+    vmask = np.ma.getmask(v)[:]
+    vtime = var[varNames[1]][:]
+    lat = var[varNames[2]][:]
+    lon = var[varNames[3]][:]
+    if optimization == 'fortran':
+        vinterp, vweight, status = 
+             gaussInterp_f(v, vmask, vtime, lat, lon,
+                           outlat, outlon, wlat, wlon, slat, slon, stime, vfactor, missingValue)
+    else:
+        vinterp, vweight, status = 
+             gaussInterp_(v, vmask, vtime, lat, lon,
+                          outlat, outlon, wlat, wlon, slat, slon, stime, vfactor, missingValue)
+    vinterp = np.ma.array(vinterp, mask=np.ma.make_mask(vweight))    # apply mask
+    return (vinterp, vweight, status)
+
+
+#@cython.boundscheck(False)
+def gaussInterp_(np.ndarray[Var_t, ndim=3] var,          # variable & mask arrays with dimensions of lat,lon,time
+                 np.ndarray[Var_t, ndim=3] vmask,         
+                 np.ndarray[np.int_t, ndim=1] vtime,
+                 np.ndarray[np.float_t, ndim=1] lat,
+                 np.ndarray[np.float_t, ndim=1] lon,     # coordinate vectors for inputs
+                 np.ndarray[np.float_t, ndim=1] outlat,  # coordinate vectors for grid to interpolate to
+                 np.ndarray[np.float_t, ndim=1] outlon,
+                 float wlat, float wlon,                 # window of lat/lon neighbors to gaussian weight, expressed in delta lat (degrees)
+                 float slat, float slon, float stime,    # sigma for gaussian downweighting with distance in lat, lon (deg), & time (days)
+                 float vfactor,                          # factor in front of gaussian expression
+                 float missingValue):                    # value to mark missing values in interp result
+    '''Gaussian interpolate in lat, lon, and time to a different lat/lon grid, and over a time window to the center time.
+Returns the 2D interpolated variable (masked), the weight array, and a status for failures.
+    '''
+    assert var.dtype == Var_t  and mask.dtype == np.int
+    
+    cdef np.ndarray[Var_t, ndim=2] vinterp = np.zeros( (outlat.shape[0], outlon.shape[0]), dtype=Var_t )  # interpolated variable, missing values not counted
+    cdef np.ndarray[Var_t, ndim=2] vweight = np.zeros( (outlat.shape[0], outlon.shape[0]), dtype=Var_t )  # weight of values interpolated (can be zero)
+    cdef int status = 0     # negative status indicates error
+
+    cdef int imin, imax, jmin, jmax
+    cdef int iin, jin, kin
+    cdef int i, j
+
+    cdef int ntime = time.shape[0]
+    cdef int nlat = lat.shape[0]
+    cdef int nlon = lon.shape[0]
+
+    cdef int noutlat = outlat.shape[0]
+    cdef int noutlon = outlon.shape[0]
+
+    cdef float wlat2 = wlat / 2.
+    cdef float wlon2 = wlon / 2.
+    cdef float lat0 = lat[0]
+    cdef float lon0 = lon[0]
+    cdef float dlat = lat[1] - lat[0]
+    cdef float dlon = lon[1] - lon[0]
+    cdef double midTime = time[int(ntime/2 + 0.5)]
+
+    for i xrange(noutlat):
+        print >>sys.stderr, outlat[i]
+        for j in xrange(noutlon):
+           imin = clamp(int((outlat[i] - wlat2 - lat0)/dlat + 0.5), 0, nlat-1)
+           imax = clamp(int((outlat[i] + wlat2 - lat0)/dlat + 0.5), 0, nlat-1)
+           jmin = clamp(int((outlon[j] - wlon2 - lon0)/dlon + 0.5), 0, nlon-1)
+           jmax = clamp(int((outlon[j] + wlon2 - lon0)/dlon + 0.5), 0, nlon-1)
+
+           for kin in xrange(ntime):
+               for iin in xrange(imin, imax+1):
+                   for jin in xrange(jmin, jmax+1):
+                       if not vmask[kin, iin, jin]:
+                           fac = exp( vfactor *
+                                     (((outlat[i] - lat[iin])/slat)**2
+                                    + ((outlon[j] - lon[jin])/slon)**2
+                                    + ((midTime   - vtime[kin])/stime)**2))
+                           val = var[kin, iin, jin]
+                           if VERBOSE > 1: print >>sys.stderr,  kin, iin, jin, vtime[kin], lat[iin], lon[jin], val, fac, val*fac
+
+                           vinterp[i,j] = vinterp[i,j] + val * fac
+                           vweight[i,j] = vweight[i,j] + fac
+
+           if vweight[i,j] != 0.0:
+               vinterp[i,j] = vinterp[i,j] / vweight[i,j]
+          else:
+               vinterp[i,j] = missingValue
+
+    return (vinterp, vweight, status)
+
+
+cdef int clamp(int i, int n, int m):
+    if i < n: return n
+    if i > m: return m
+    return i