You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sdap.apache.org by le...@apache.org on 2017/10/27 22:40:16 UTC
[39/51] [partial] incubator-sdap-nexus git commit: SDAP-1 Import all
code under the SDAP SGA
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/climatology/clim/climatology1.py
----------------------------------------------------------------------
diff --git a/climatology/clim/climatology1.py b/climatology/clim/climatology1.py
new file mode 100755
index 0000000..940a8d0
--- /dev/null
+++ b/climatology/clim/climatology1.py
@@ -0,0 +1,233 @@
+"""
+climatology.py
+
+Compute a multi-epoch (multi-day) climatology from daily SST Level-3 grids.
+
+Simple code to be run on Spark cluster, or using multi-core parallelism on single machine.
+
+"""
+
+import sys, os, calendar, urlparse, urllib
+from datetime import datetime
+import numpy as N
+from variables import getVariables, close
+#from timePartitions import partitionFilesByKey
+from split import fixedSplit
+#from stats import Stats
+from pathos.multiprocessing import ProcessingPool as Pool
+from plotlib import imageMap, makeMovie
+
+#from gaussInterp import gaussInterp
+
+VERBOSE = 1
+
+# Possible execution modes
+# Multicore & cluster modes use pathos pool.map(); Spark mode uses PySpark cluster.
+ExecutionModes = ['sequential', 'multicore', 'cluster', 'spark']
+
+# SST L3m 4.6km Metadata
+# SST calues are scaled integers in degrees Celsius, lat/lon is 4320 x 8640
+# Variable = 'sst', Mask = 'qual_sst', Coordinates = ['lat', 'lon']
+
+# Generate algorithmic name for N-day Climatology product
+SSTClimatologyTemplate = 'SST.L3.Global.Clim.%(period)s.%(date)s.%(version)s.nc' #??
+
+# Simple mask and average functions to get us started, then add gaussian interpolation.
+# MODIS L3 SST product, qual_sst is [-1, 2] - =0 is best data, can add =1 for better coverage
+def qcMask(var, mask): return N.ma.array(var, mask=N.ma.make_mask(mask))
+#def qcMask(var, mask): return N.ma.masked_where(mask != 0, var)
+
+def average(a): return N.ma.mean(a, axis=0)
+#AveragingFunctions = {'pixelAverage': average, 'gaussInterp': gaussInterp}
+AveragingFunctions = {'pixelAverage': average, 'gaussInterp': average}
+
+
+def climByAveragingPeriods(urls, # list of (daily) granule URLs for a long time period (e.g. a year)
+ nEpochs, # compute a climatology for every N epochs (days) by 'averaging'
+ nWindow, # number of epochs in window needed for averaging
+ variable, # name of primary variable in file
+ mask, # name of mask variable
+ coordinates, # names of coordinate arrays to read and pass on (e.g. 'lat' and 'lon')
+ maskFn=qcMask, # mask function to compute mask from mask variable
+ averager='pixelAverage', # averaging function to use, one of ['pixelAverage', 'gaussInterp']
+ mode='sequential', # Map across time periods of N-days for concurrent work, executed by:
+ # 'sequential' map, 'multicore' using pool.map(), 'cluster' using pathos pool.map(),
+ # or 'spark' using PySpark
+ numNodes=1, # number of cluster nodes to use
+ nWorkers=4, # number of parallel workers per node
+ averagingFunctions=AveragingFunctions, # dict of possible averaging functions
+ legalModes=ExecutionModes # list of possiblel execution modes
+ ):
+ '''Compute a climatology every N days by applying a mask and averaging function.
+Writes the averaged variable grid, attributes of the primary variable, and the coordinate arrays in a dictionary.
+***Assumption: This routine assumes that the N grids will fit in memory.***
+ '''
+ try:
+ averageFn = averagingFunctions[averager]
+ except :
+ averageFn = average
+ print >>sys.stderr, 'climatology: Error, Averaging function must be one of: %s' % str(averagingFunctions)
+
+ urlSplits = [s for s in fixedSplit(urls, nEpochs)]
+ if VERBOSE: print >>sys.stderr, urlSplits
+
+ def climsContoured(urls):
+ n = len(urls)
+ var = climByAveraging(urls, variable, mask, coordinates, maskFn, averageFn)
+ return contourMap(var, variable, coordinates, n, urls[0])
+
+ if mode == 'sequential':
+ plots = map(climsContoured, urlSplits)
+ elif mode == 'multicore':
+ pool = Pool(nWorkers)
+ plots = pool.map(climsContoured, urlSplits)
+ elif mode == 'cluster':
+ pass
+ elif mode == 'spark':
+ pass
+
+ plots = map(climsContoured, urlSplits)
+ print plots
+ return plots
+# return makeMovie(plots, 'clim.mpg')
+
+
+def climByAveraging(urls, # list of granule URLs for a time period
+ variable, # name of primary variable in file
+ mask, # name of mask variable
+ coordinates, # names of coordinate arrays to read and pass on (e.g. 'lat' and 'lon')
+ maskFn=qcMask, # mask function to compute mask from mask variable
+ averageFn=average # averaging function to use
+ ):
+ '''Compute a climatology over N arrays by applying a mask and averaging function.
+Returns the averaged variable grid, attributes of the primary variable, and the coordinate arrays in a dictionary.
+***Assumption: This routine assumes that the N grids will fit in memory.***
+ '''
+ n = len(urls)
+ varList = [variable, mask]
+ for i, url in enumerate(urls):
+ fn = retrieveFile(url, '~/cache')
+ if VERBOSE: print >>sys.stderr, 'Read variables and mask ...'
+ var, fh = getVariables(fn, varList) # return dict of variable objects by name
+ if i == 0:
+ dtype = var[variable].dtype
+ shape = (n,) + var[variable].shape
+ accum = N.ma.empty(shape, dtype)
+
+ v = maskFn(var[variable], var[mask]) # apply quality mask variable to get numpy MA
+# v = var[variable][:]
+ accum[i] = v # accumulate N arrays for 'averaging'
+ if i+1 != len(urls): # keep var dictionary from last file to grab metadata
+ close(fh) # REMEMBER: closing fh loses in-memory data structures
+
+ if VERBOSE: print >>sys.stderr, 'Averaging ...'
+ coord, fh = getVariables(fn, coordinates) # read coordinate arrays and add to dict
+ for c in coordinates: var[c] = coord[c][:]
+
+ if averageFn == average:
+ avg = averageFn(accum) # call averaging function
+ else:
+ var[variable] = accum
+ if averageFn == gaussInterp:
+ varNames = variable + coordinates
+ avg, vweight, status = \
+ gaussInterp(var, varNames, latGrid, lonGrid, wlat, wlon, slat, slon, stime, vfactor, missingValue)
+
+ var['attributes'] = var[variable].__dict__ # save attributes of primary variable
+ var[variable] = avg # return primary variable & mask arrays in dict
+ var[mask] = N.ma.getmask(avg)
+
+# close(fh) # Can't close, lose netCDF4.Variable objects, leaking two fh
+ return var
+
+
+def contourMap(var, variable, coordinates, n, url):
+ p = urlparse.urlparse(url)
+ filename = os.path.split(p.path)[1]
+ return filename
+ outFile = filename + '.png'
+ # Downscale variable array (SST) before contouring, matplotlib is TOO slow on large arrays
+ vals = var[variable][:]
+
+ # Fixed color scale, write file, turn off auto borders, set title, reverse lat direction so monotonically increasing??
+ imageMap(var[coordinates[1]][:], var[coordinates[0]][:], var[variable][:],
+ vmin=-2., vmax=45., outFile=outFile, autoBorders=False,
+ title='%s %d-day Mean from %s' % (variable.upper(), n, filename))
+ print >>sys.stderr, 'Writing contour plot to %s' % outFile
+ return outFile
+
+
+def isLocalFile(url):
+ '''Check if URL is a local path.'''
+ u = urlparse.urlparse(url)
+ if u.scheme == '' or u.scheme == 'file':
+ if not path.exists(u.path):
+ print >>sys.stderr, 'isLocalFile: File at local path does not exist: %s' % u.path
+ return (True, u.path)
+ else:
+ return (False, u.path)
+
+
+def retrieveFile(url, dir=None):
+ '''Retrieve a file from a URL, or if it is a local path then verify it exists.'''
+ if dir is None: dir = './'
+ ok, path = isLocalFile(url)
+ fn = os.path.split(path)[1]
+ outPath = os.path.join(dir, fn)
+ if not ok:
+ if os.path.exists(outPath):
+ print >>sys.stderr, 'retrieveFile: Using cached file: %s' % outPath
+ else:
+ try:
+ print >>sys.stderr, 'retrieveFile: Retrieving (URL) %s to %s' % (url, outPath)
+ urllib.urlretrieve(url, outPath)
+ except:
+ print >>sys.stderr, 'retrieveFile: Cannot retrieve file at URL: %s' % url
+ return None
+ return outPath
+
+
+def dailyFile2date(path, offset=1):
+ '''Convert YYYYDOY string in filename to date.'''
+ fn = os.path.split(path)[1]
+ year = int(fn[offset:offset+4])
+ doy = int(fn[offset+5:offset+8])
+ return fn[5:15].replace('.', '/')
+
+
+def formatRegion(r):
+ """Format lat/lon region specifier as string suitable for file name."""
+ if isinstance(r, str):
+ return r
+ else:
+ strs = [str(i).replace('-', 'm') for i in r]
+ return 'region-%s-%sby%s-%s' % tuple(strs)
+
+
+def formatGrid(r):
+ """Format lat/lon grid resolution specifier as string suitable for file name."""
+ if isinstance(r, str):
+ return r
+ else:
+ return str(r[0]) + 'by' + str(r[1])
+
+
+def main(args):
+ nEpochs = int(args[0])
+ nWindow = int(args[1])
+ averager = args[2]
+ mode = args[3]
+ nWorkers = int(args[4])
+ urlFile = args[5]
+ urls = [s.strip() for s in open(urlFile, 'r')]
+ return climByAveragingPeriods(urls, nEpochs, nWindow, 'sst', 'qual_sst', ['lat', 'lon'],
+ averager=averager, mode=mode, nWorkers=nWorkers)
+
+
+if __name__ == '__main__':
+ print main(sys.argv[1:])
+
+
+# python climatology.py 5 5 pixelAverage sequential 1 urls_sst_10days.txt
+# python climatology.py 5 5 gaussianInterp multicore 8 urls_sst_40days.txt
+
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/climatology/clim/climatology2.py
----------------------------------------------------------------------
diff --git a/climatology/clim/climatology2.py b/climatology/clim/climatology2.py
new file mode 100755
index 0000000..231efb7
--- /dev/null
+++ b/climatology/clim/climatology2.py
@@ -0,0 +1,453 @@
+"""
+climatology2.py
+
+Compute a multi-epoch (multi-day) climatology from daily SST Level-3 grids.
+
+Simple code to be run on Spark cluster, or using multi-core parallelism on single machine.
+
+"""
+
+import sys, os, urlparse, urllib, re, time
+import numpy as N
+import matplotlib
+matplotlib.use('Agg')
+import matplotlib.pylab as M
+
+from variables import getVariables, close
+from cache import retrieveFile, CachePath
+from split import fixedSplit, splitByNDays
+from netCDF4 import Dataset, default_fillvals
+from pathos.multiprocessing import ProcessingPool as Pool
+from plotlib import imageMap, makeMovie
+
+from spatialFilter import spatialFilter
+from gaussInterp import gaussInterp # calls into Fortran version gaussInterp_f.so
+#from gaussInterp_slow import gaussInterp_slow as gaussInterp # pure python, slow debuggable version
+
+#import pyximport; pyximport.install(pyimport=False)
+#from gaussInterp import gaussInterp, gaussInterp_ # attempted cython version, had issues
+
+VERBOSE = 1
+
+# Possible execution modes
+# Multicore & cluster modes use pathos pool.map(); Spark mode uses PySpark cluster.
+ExecutionModes = ['sequential', 'multicore', 'cluster', 'spark']
+
+# SST L3m 4.6km Metadata
+# SST calues are scaled integers in degrees Celsius, lon/lat is 8640 x 4320
+# Variable = 'sst', Mask = 'qual_sst', Coordinates = ['lon', 'lat']
+
+# Generate algorithmic name for N-day Climatology product
+SSTClimatologyTemplate = 'SST.L3.Global.Clim.%(period)s.%(date)s.%(version)s.nc' #??
+
+# Simple mask and average functions to get us started, then add gaussian interpolation.
+# MODIS L3 SST product, qual_sst is [-1, 2] - =0 is best data, can add =1 for better coverage
+#def qcMask(var, mask): return N.ma.array(var, mask=N.ma.make_mask(mask == 0))
+
+def qcMask(var, mask): return N.ma.masked_where(mask != 0, var)
+#def qcMask(var, mask): return N.ma.masked_where(mask < 0, var)
+
+def splitModisSst(seq, n):
+ for chunk in splitByNDays(seq, n, re.compile(r'(...).L3m'), keyed=False):
+ yield chunk
+
+def mean(a): return N.ma.mean(a, axis=0)
+
+AveragingFunctions = {'pixelMean': mean, 'gaussInterp': gaussInterp, 'spatialFilter': spatialFilter}
+
+PixelMeanConfig = {'name': 'pixelMean'}
+
+GaussInterpConfig = {'name': 'gaussInterp',
+ 'latGrid': None, 'lonGrid': None, # None means use input lat/lon grid
+ 'wlat': 3, 'wlon': 3,
+ 'slat': 0.15, 'slon': 0.15, 'stime': 1,
+ 'vfactor': -0.6931, 'missingValue': default_fillvals['f4']}
+
+GaussInterpConfig1a = {'name': 'gaussInterp',
+ 'latGrid': None, 'lonGrid': None, # None means use input lat/lon grid
+ 'wlat': 0.30, 'wlon': 0.30,
+ 'slat': 0.15, 'slon': 0.15, 'stime': 1,
+ 'vfactor': -0.6931, 'missingValue': default_fillvals['f4']}
+
+GaussInterpConfig1b = {'name': 'gaussInterp',
+ 'latGrid': None, 'lonGrid': None, # None means use input lat/lon grid
+ 'wlat': 0.08, 'wlon': 0.08,
+ 'slat': 0.15, 'slon': 0.15, 'stime': 1,
+ 'vfactor': -0.6931, 'missingValue': default_fillvals['f4']}
+
+GaussInterpConfig2 = {'name': 'gaussInterp',
+ 'latGrid': (89.5, -89.5, -0.25), 'lonGrid': (-180., 179., 0.25),
+ 'wlat': 2., 'wlon': 2.,
+ 'slat': 0.15, 'slon': 0.15, 'stime': 1,
+ 'vfactor': -0.6931, 'missingValue': default_fillvals['f4']}
+
+FilterGaussian = [[1, 2, 1], [2, 4, 2], [1, 2, 1]] # divide by 16
+FilterLowPass = [[1, 1, 1], [1, 1, 1], [1, 1, 1]] # divide by 9
+
+SpatialFilterConfig1 = {'name': 'spatialFilter', 'normalization': 16.,
+ 'spatialFilter': N.array(FilterGaussian, dtype=N.int32),
+ 'missingValue': default_fillvals['f4']}
+
+SpatialFilterConfig2 = {'name': 'spatialFilter', 'normalization': 9.,
+ 'spatialFilter': N.array(FilterLowPass, dtype=N.int32),
+ 'missingValue': default_fillvals['f4']}
+
+
+
+def climByAveragingPeriods(urls, # list of (daily) granule URLs for a long time period (e.g. a year)
+ nEpochs, # compute a climatology for every N epochs (days) by 'averaging'
+ nWindow, # number of epochs in window needed for averaging
+ nNeighbors, # number of neighbors on EACH side in lat/lon directions to use in averaging
+ variable, # name of primary variable in file
+ mask, # name of mask variable
+ coordinates, # names of coordinate arrays to read and pass on (e.g. 'lat' and 'lon')
+ splitFn=splitModisSst, # split function to use to partition the input URL list
+ maskFn=qcMask, # mask function to compute mask from mask variable
+ averager='pixelMean', # averaging function to use, one of ['pixelMean', 'gaussInterp', 'spatialFilter']
+ averagingConfig={}, # dict of parameters to control the averaging function (e.g. gaussInterp)
+ optimization='fortran', # optimization mode (fortran or cython)
+ mode='sequential', # Map across time periods of N-days for concurrent work, executed by:
+ # 'sequential' map, 'multicore' using pool.map(), 'cluster' using pathos pool.map(),
+ # or 'spark' using PySpark
+ numNodes=1, # number of cluster nodes to use
+ nWorkers=4, # number of parallel workers per node
+ averagingFunctions=AveragingFunctions, # dict of possible averaging functions
+ legalModes=ExecutionModes, # list of possible execution modes
+ cachePath=CachePath # directory to cache retrieved files in
+ ):
+ '''Compute a climatology every N days by applying a mask and averaging function.
+Writes the averaged variable grid, attributes of the primary variable, and the coordinate arrays in a dictionary.
+***Assumption: This routine assumes that the N grids will fit in memory.***
+ '''
+ if averagingConfig['name'] == 'gaussInterp':
+ averagingConfig['wlat'] = nNeighbors
+ averagingConfig['wlon'] = nNeighbors
+ try:
+ averageFn = averagingFunctions[averager]
+ except:
+ print >>sys.stderr, 'climatology: Error, Averaging function must be one of: %s' % str(averagingFunctions)
+ sys.exit(1)
+
+ urlSplits = [s for s in splitFn(urls, nEpochs)]
+
+ def climsContoured(urls, plot=None, fillValue=default_fillvals['f4'], format='NETCDF4', cachePath=cachePath):
+ n = len(urls)
+ if VERBOSE: print >>sys.stderr, urls
+
+ var = climByAveraging(urls, variable, mask, coordinates, maskFn, averageFn, averagingConfig, optimization, cachePath)
+
+ fn = os.path.split(urls[0])[1]
+ inFile = os.path.join(cachePath, fn)
+ method = averagingConfig['name']
+ fn = os.path.splitext(fn)[0]
+ day = fn[5:8]
+ nDays = int(var['time'][0])
+
+ if 'wlat' in averagingConfig:
+ wlat = averagingConfig['wlat']
+ else:
+ wlat = 1
+ if int(wlat) == wlat:
+ outFile = 'A%s.L3m_%dday_clim_sst_4km_%s_%dnbrs.nc' % (day, nDays, method, int(wlat)) # mark each file with first day in period
+ else:
+ outFile = 'A%s.L3m_%dday_clim_sst_4km_%s_%4.2fnbrs.nc' % (day, nDays, method, wlat) # mark each file with first day in period
+
+ outFile = writeOutNetcdfVars(var, variable, mask, coordinates, inFile, outFile, fillValue, format)
+
+ if plot == 'contour':
+ figFile = contourMap(var, variable, coordinates, n, outFile)
+ elif plot == 'histogram':
+# figFile = histogram(var, variable, n, outFile)
+ figFile = None
+ else:
+ figFile = None
+ return (outFile, figFile)
+
+ if mode == 'sequential':
+ results = map(climsContoured, urlSplits)
+ elif mode == 'multicore':
+ pool = Pool(nWorkers)
+ results = pool.map(climsContoured, urlSplits)
+ elif mode == 'cluster':
+ pass
+ elif mode == 'spark':
+ pass
+
+ return results
+
+
+def climByAveraging(urls, # list of granule URLs for a time period
+ variable, # name of primary variable in file
+ mask, # name of mask variable
+ coordinates, # names of coordinate arrays to read and pass on (e.g. 'lat' and 'lon')
+ maskFn=qcMask, # mask function to compute mask from mask variable
+ averageFn=mean, # averaging function to use
+ averagingConfig={}, # parameters to control averaging function (e.g. gaussInterp)
+ optimization='fortran', # optimization mode (fortran or cython)
+ cachePath=CachePath
+ ):
+ '''Compute a climatology over N arrays by applying a mask and averaging function.
+Returns the averaged variable grid, attributes of the primary variable, and the coordinate arrays in a dictionary.
+***Assumption: This routine assumes that the N grids will fit in memory.***
+ '''
+ n = len(urls)
+ varList = [variable, mask]
+ var = {}
+ vtime = N.zeros((n,), N.int32)
+
+ for i, url in enumerate(urls):
+ try:
+ path = retrieveFile(url, cachePath)
+ fn = os.path.split(path)[1]
+ vtime[i] = int(fn[5:8]) # KLUDGE: extract DOY from filename
+ except:
+ print >>sys.stderr, 'climByAveraging: Error, continuing without file %s' % url
+ accum[i] = emptyVar
+ continue
+ if path is None: continue
+ print >>sys.stderr, 'Read variables and mask ...'
+ try:
+ var, fh = getVariables(path, varList, arrayOnly=True, order='F', set_auto_mask=False) # return dict of variable objects by name
+ except:
+ print >>sys.stderr, 'climByAveraging: Error, cannot read file %s' % path
+ accum[i] = emptyVar
+ continue
+ if i == 0:
+ dtype = var[variable].dtype
+ if 'int' in dtype.name: dtype = N.float32
+ shape = (n,) + var[variable].shape
+ accum = N.ma.empty(shape, dtype, order='F')
+ emptyVar = N.array(N.ma.masked_all(var[variable].shape, dtype), order='F') # totally masked variable array for missing or bad file reads
+
+ print >>sys.stderr, 'Read coordinates ...'
+ var, fh = getVariables(path, coordinates, var, arrayOnly=True, order='F') # read coordinate arrays and add to dict
+
+ var[variable] = maskFn(var[variable], var[mask]) # apply quality mask variable to get numpy MA, turned off masking done by netCDF4 library
+# var[variable] = var[variable][:]
+
+ # Echo variable range for sanity check
+ vals = var[variable].compressed()
+ print >>sys.stderr, 'Variable Range: min, max:', vals.min(), vals.max()
+
+ # Plot input grid
+# figFile = histogram(vals, variable, n, os.path.split(path)[1])
+# figFile = contourMap(var, variable, coordinates[1:], n, os.path.split(path)[1])
+
+ accum[i] = var[variable] # accumulate N arrays for 'averaging"""
+# if i != 0 and i+1 != n: close(fh) # REMEMBER: closing fh loses netCDF4 in-memory data structures
+ close(fh)
+
+ coordinates = ['time'] + coordinates # add constructed time (days) as first coordinate
+ var['time'] = vtime
+
+ if averagingConfig['name'] == 'pixelMean':
+ print >>sys.stderr, 'Doing Pixel Average over %d grids ...' % n
+ start = time.time()
+ avg = averageFn(accum) # call pixel averaging function
+ end = time.time()
+ print >>sys.stderr, 'pixelMean execution time:', (end - start)
+ outlat = var[coordinates[1]].astype(N.float32)[:]
+ outlon = var[coordinates[2]].astype(N.float32)[:]
+ elif averagingConfig['name'] == 'gaussInterp':
+ print >>sys.stderr, 'Doing Gaussian Interpolation over %d grids ...' % n
+ var[variable] = accum
+ c = averagingConfig
+ latGrid = c['latGrid']; lonGrid = c['lonGrid']
+ if latGrid is not None and lonGrid is not None:
+ outlat = N.arange(latGrid[0], latGrid[1]+latGrid[2], latGrid[2], dtype=N.float32, order='F')
+ outlon = N.arange(lonGrid[0], lonGrid[1]+lonGrid[2], lonGrid[2], dtype=N.float32, order='F')
+ else:
+ outlat = N.array(var[coordinates[1]], dtype=N.float32, order='F')
+ outlon = N.array(var[coordinates[2]], dtype=N.float32, order='F')
+ varNames = [variable] + coordinates
+ start = time.time()
+ avg, weight, status = \
+ gaussInterp(var, varNames, outlat, outlon, c['wlat'], c['wlon'],
+ c['slat'], c['slon'], c['stime'], c['vfactor'], c['missingValue'],
+ VERBOSE, optimization)
+ end = time.time()
+ var['outweight'] = weight.astype(N.float32)
+ print >>sys.stderr, 'gaussInterp execution time:', (end - start)
+ elif averagingConfig['name'] == 'spatialFilter':
+ print >>sys.stderr, 'Applying Spatial 3x3 Filter and then averaging over %d grids ...' % n
+ var[variable] = accum
+ c = averagingConfig
+ varNames = [variable] + coordinates
+ start = time.time()
+ avg, count, status = \
+ spatialFilter(var, varNames, c['spatialFilter'], c['normalization'],
+ c['missingValue'], VERBOSE, optimization)
+ end = time.time()
+ print >>sys.stderr, 'spatialFilter execution time:', (end - start)
+ outlat = var[coordinates[1]].astype(N.float32)[:]
+ outlon = var[coordinates[2]].astype(N.float32)[:]
+
+ var['out'+variable] = avg.astype(N.float32) # return primary variable & mask arrays in dict
+ var['out'+mask] = N.ma.getmask(avg)
+ var['outlat'] = outlat
+ var['outlon'] = outlon
+ return var
+
+
+def writeOutNetcdfVars(var, variable, mask, coordinates, inFile, outFile, fillValue=None, format='NETCDF4'):
+ '''Construct output bundle of arrays with NetCDF dimensions and attributes.
+Output variables and attributes will have same names as the input file.
+ '''
+ din = Dataset(inFile, 'r')
+ dout = Dataset(outFile, 'w', format=format)
+ print >>sys.stderr, 'Writing %s ...' % outFile
+
+ # Transfer global attributes from input file
+ for a in din.ncattrs():
+ dout.setncattr(a, din.getncattr(a))
+
+ # Add dimensions and variables, copying data
+ coordDim = [dout.createDimension(coord, var['out'+coord].shape[0]) for coord in coordinates] # here output lon, lat, etc.
+ for coord in coordinates:
+ v = dout.createVariable(coord, var['out'+coord].dtype, (coord,))
+ v[:] = var['out'+coord][:]
+ primaryVar = dout.createVariable(variable, var['out'+variable].dtype, coordinates, fill_value=fillValue)
+ primaryVar[:] = var['out'+variable][:] # transfer array
+ maskVar = dout.createVariable(mask, 'i1', coordinates)
+ maskVar[:] = var['out'+mask].astype('i1')[:]
+
+ # Transfer variable attributes from input file
+ for k,v in dout.variables.iteritems():
+ for a in din.variables[k].ncattrs():
+ if a == 'scale_factor' or a == 'add_offset' or a == '_FillValue': continue
+ v.setncattr(a, din.variables[k].getncattr(a))
+ if k == variable:
+ try:
+# if fillValue == None: fillValue = din.variables[k].getncattr('_FillValue') # total kludge
+ if fillValue == None: fillValue = default_fillvals['f4']
+# print >>sys.stderr, default_fillvals
+# v.setncattr('_FillValue', fillValue) # set proper _FillValue for climatology array
+ v.setncattr('missing_value', fillValue)
+ print >>sys.stderr, 'Setting missing_value for primary variable %s to %f' % (variable, fillValue)
+ except:
+ print >>sys.stderr, 'writeOutNetcdfVars: Warning, for variable %s no fill value specified or derivable from inputs.' % variable
+ din.close()
+ dout.close()
+ return outFile
+
+
+def contourMap(var, variable, coordinates, n, outFile):
+ figFile = os.path.splitext(outFile)[0] + '_hist.png'
+ # TODO: Downscale variable array (SST) before contouring, matplotlib is TOO slow on large arrays
+ vals = var[variable][:]
+
+ # Fixed color scale, write file, turn off auto borders, set title, reverse lat direction so monotonically increasing??
+ imageMap(var[coordinates[1]][:], var[coordinates[0]][:], var[variable][:],
+ vmin=-2., vmax=45., outFile=figFile, autoBorders=False,
+ title='%s %d-day Mean from %s' % (variable.upper(), n, outFile))
+ print >>sys.stderr, 'Writing contour plot to %s' % figFile
+ return figFile
+
+
+def histogram(vals, variable, n, outFile):
+ figFile = os.path.splitext(outFile)[0] + '_hist.png'
+ M.clf()
+# M.hist(vals, 47, (-2., 45.))
+ M.hist(vals, 94)
+ M.xlim(-5, 45)
+ M.xlabel('SST in degrees Celsius')
+ M.ylim(0, 300000)
+ M.ylabel('Count')
+ M.title('Histogram of %s %d-day Mean from %s' % (variable.upper(), n, outFile))
+ M.show()
+ print >>sys.stderr, 'Writing histogram plot to %s' % figFile
+ M.savefig(figFile)
+ return figFile
+
+
+def dailyFile2date(path, offset=1):
+ '''Convert YYYYDOY string in filename to date.'''
+ fn = os.path.split(path)[1]
+ year = int(fn[offset:offset+4])
+ doy = int(fn[offset+5:offset+8])
+ return fn[5:15].replace('.', '/')
+
+
+def formatRegion(r):
+ """Format lat/lon region specifier as string suitable for file name."""
+ if isinstance(r, str):
+ return r
+ else:
+ strs = [str(i).replace('-', 'm') for i in r]
+ return 'region-%s-%sby%s-%s' % tuple(strs)
+
+
+def formatGrid(r):
+ """Format lat/lon grid resolution specifier as string suitable for file name."""
+ if isinstance(r, str):
+ return r
+ else:
+ return str(r[0]) + 'by' + str(r[1])
+
+
+def main(args):
+ nEpochs = int(args[0])
+ nWindow = int(args[1])
+ nNeighbors = int(args[2])
+ averager = args[3]
+ optimization = args[4]
+ mode = args[5]
+ nWorkers = int(args[6])
+ urlFile = args[7]
+ urls = [s.strip() for s in open(urlFile, 'r')]
+ if averager == 'gaussInterp':
+ results = climByAveragingPeriods(urls, nEpochs, nWindow, nNeighbors, 'sst', 'qual_sst', ['lat', 'lon'],
+ averager=averager, optimization=optimization, averagingConfig=GaussInterpConfig,
+ mode=mode, nWorkers=nWorkers)
+ elif averager == 'spatialFilter':
+ results = climByAveragingPeriods(urls, nEpochs, nWindow, nNeighbors, 'sst', 'qual_sst', ['lat', 'lon'],
+ averager=averager, optimization=optimization, averagingConfig=SpatialFilterConfig1,
+ mode=mode, nWorkers=nWorkers)
+ elif averager == 'pixelMean':
+ results = climByAveragingPeriods(urls, nEpochs, nWindow, nNeighbors, 'sst', 'qual_sst', ['lat', 'lon'],
+ averager=averager, optimization=optimization, averagingConfig=PixelMeanConfig,
+ mode=mode, nWorkers=nWorkers)
+ else:
+ print >>sys.stderr, 'climatology2: Error, averager must be one of', AveragingFunctions.keys()
+ sys.exit(1)
+
+ if results[0][1] is not None:
+ makeMovie([r[1] for r in results], 'clim.mpg')
+ return results
+
+
+if __name__ == '__main__':
+ print main(sys.argv[1:])
+
+
+# Old Tests:
+# python climatology2.py 5 5 0 pixelMean fortran sequential 1 urls_sst_10days.txt
+# python climatology2.py 5 5 3 gaussInterp fortran sequential 1 urls_sst_10days.txt
+
+# python climatology2.py 5 5 0 pixelMean fortran sequential 1 urls_sst_40days.txt
+# python climatology2.py 5 5 0 pixelMean fortran multicore 8 urls_sst_40days.txt
+# python climatology2.py 5 5 3 gaussInterp fortran multicore 8 urls_sst_40days.txt
+
+# Old Production:
+# python climatology2.py 5 5 0 pixelMean fortran multicore 16 urls_sst_2015.txt >& log &
+# python climatology2.py 10 10 0 pixelMean fortran multicore 16 urls_sst_2015.txt >& log &
+# python climatology2.py 5 5 3 gaussInterp fortran multicore 16 urls_sst_2015.txt >& log &
+
+
+# Tests:
+# python climatology2.py 5 5 0 pixelMean fortran sequential 1 urls_sst_daynight_5days_sorted.txt
+# python climatology2.py 5 5 0 pixelMean fortran multicore 4 urls_sst_daynight_5days_sorted.txt
+# python climatology2.py 5 5 3 gaussInterp fortran sequential 1 urls_sst_daynight_5days_sorted.txt
+# python climatology2.py 5 5 3 gaussInterp fortran multicore 4 urls_sst_daynight_5days_sorted.txt
+# python climatology2.py 5 7 1 spatialFilter fortran sequential 1 urls_sst_daynight_5days_sorted.txt
+
+# Test number of neighbors needed:
+# python climatology2.py 5 7 3 gaussInterp fortran multicore 4 urls_sst_daynight_20days_sorted.txt
+
+
+# Production:
+# python climatology2.py 5 7 0 pixelMean fortran multicore 4 urls_sst_daynight_2003_2015_sorted.txt
+# python climatology2.py 5 7 3 gaussInterp fortran sequential 1 urls_sst_daynight_2003_2015_sorted.txt
+# python climatology2.py 5 7 3 gaussInterp fortran multicore 4 urls_sst_daynight_2003_2015_sorted.txt
+# python climatology2.py 5 7 1 spatialFilter fortran multicore 4 urls_sst_daynight_2003_2015_sorted.txt
+
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/climatology/clim/climatology3Spark.py
----------------------------------------------------------------------
diff --git a/climatology/clim/climatology3Spark.py b/climatology/clim/climatology3Spark.py
new file mode 100755
index 0000000..14a5f59
--- /dev/null
+++ b/climatology/clim/climatology3Spark.py
@@ -0,0 +1,418 @@
+"""
+climatology3Spark.py
+
+Compute a multi-epoch (multi-day) climatology from daily SST Level-3 grids.
+
+Simple code to be run on Spark cluster, or using multi-core parallelism on single machine.
+
+"""
+
+import sys, os, urlparse, urllib, re, time
+import numpy as N
+import matplotlib
+matplotlib.use('Agg')
+import matplotlib.pylab as M
+
+from variables import getVariables, close
+from cache import retrieveFile, CachePath
+from split import fixedSplit, splitByNDays
+from netCDF4 import Dataset, default_fillvals
+from plotlib import imageMap, makeMovie
+
+from spatialFilter import spatialFilter
+from gaussInterp import gaussInterp # calls into Fortran version gaussInterp_f.so
+#from gaussInterp_slow import gaussInterp_slow as gaussInterp # pure python, slow debuggable version
+
+VERBOSE = 1
+
+# Possible execution modes
+ExecutionModes = ['multicore', 'spark']
+
+# SST L3m 4.6km Metadata
+# SST calues are scaled integers in degrees Celsius, lon/lat is 8640 x 4320
+# Variable = 'sst', Mask = 'qual_sst', Coordinates = ['lon', 'lat']
+
+# Generate algorithmic name for N-day Climatology product
+SSTClimatologyTemplate = 'SST.L3.Global.Clim.%(period)s.%(date)s.%(version)s.nc' #??
+
+# Simple mask and average functions to get us started, then add gaussian interpolation.
+# MODIS L3 SST product, qual_sst is [-1, 2] - =0 is best data, can add =1 for better coverage
+#def qcMask(var, mask): return N.ma.array(var, mask=N.ma.make_mask(mask == 0))
+
+def qcMask(var, mask): return N.ma.masked_where(mask != 0, var)
+#def qcMask(var, mask): return N.ma.masked_where(mask < 0, var)
+
+def splitModisSst(seq, n):
+ for chunk in splitByNDays(seq, n, re.compile(r'(...).L3m')):
+ yield chunk
+
+AveragingFunctions = {'pixelMean': mean, 'gaussInterp': gaussInterp, 'spatialFilter': spatialFilter}
+
+PixelMeanConfig = {'name': 'pixelMean'}
+
+GaussInterpConfig = {'name': 'gaussInterp',
+ 'latGrid': None, 'lonGrid': None, # None means use input lat/lon grid
+ 'wlat': 3, 'wlon': 3,
+ 'slat': 0.15, 'slon': 0.15, 'stime': 1,
+ 'vfactor': -0.6931, 'missingValue': default_fillvals['f4']}
+
+GaussInterpConfig1a = {'name': 'gaussInterp',
+ 'latGrid': None, 'lonGrid': None, # None means use input lat/lon grid
+ 'wlat': 0.30, 'wlon': 0.30,
+ 'slat': 0.15, 'slon': 0.15, 'stime': 1,
+ 'vfactor': -0.6931, 'missingValue': default_fillvals['f4']}
+
+GaussInterpConfig1b = {'name': 'gaussInterp',
+ 'latGrid': None, 'lonGrid': None, # None means use input lat/lon grid
+ 'wlat': 0.08, 'wlon': 0.08,
+ 'slat': 0.15, 'slon': 0.15, 'stime': 1,
+ 'vfactor': -0.6931, 'missingValue': default_fillvals['f4']}
+
+GaussInterpConfig2 = {'name': 'gaussInterp',
+ 'latGrid': (89.5, -89.5, -0.25), 'lonGrid': (-180., 179., 0.25),
+ 'wlat': 2., 'wlon': 2.,
+ 'slat': 0.15, 'slon': 0.15, 'stime': 1,
+ 'vfactor': -0.6931, 'missingValue': default_fillvals['f4']}
+
+FilterGaussian = [[1, 2, 1], [2, 4, 2], [1, 2, 1]] # divide by 16
+FilterLowPass = [[1, 1, 1], [1, 1, 1], [1, 1, 1]] # divide by 9
+
+SpatialFilterConfig1 = {'name': 'spatialFilter', 'normalization': 16.,
+ 'spatialFilter': N.array(FilterGaussian, dtype=N.int32),
+ 'missingValue': default_fillvals['f4']}
+
+SpatialFilterConfig2 = {'name': 'spatialFilter', 'normalization': 9.,
+ 'spatialFilter': N.array(FilterLowPass, dtype=N.int32),
+ 'missingValue': default_fillvals['f4']}
+
+# Directory to cache retrieved files in
+CachePath = '~/cache'
+
+
+
+def climByAveragingPeriods(urls, # list of (daily) granule URLs for a long time period (e.g. a year)
+ nEpochs, # compute a climatology for every N epochs (days) by 'averaging'
+ nWindow, # number of epochs in window needed for averaging
+ variable, # name of primary variable in file
+ mask, # name of mask variable
+ coordinates, # names of coordinate arrays to read and pass on (e.g. 'lat' and 'lon')
+ splitFn=splitModisSst, # split function to use to partition the input URL list
+ maskFn=qcMask, # mask function to compute mask from mask variable
+ averager='pixelMean', # averaging function to use, one of ['pixelMean', 'gaussInterp', 'spatialFilter']
+ averagingConfig={}, # dict of parameters to control the averaging function (e.g. gaussInterp)
+ optimization='fortran', # optimization mode (fortran or cython)
+ mode='multicore', # Map across time periods of N-days for concurrent work, executed by:
+ # 'multicore' using pysparkling or 'spark' using Spark/Mesos cluster
+ numNodes=1, # number of cluster nodes to use
+ nWorkers=4, # number of parallel workers per node
+ averagingFunctions=AveragingFunctions, # dict of possible averaging functions
+ legalModes=ExecutionModes, # list of possible execution modes
+ cachePath=CachePath # directory to cache retrieved files in
+ ):
+ '''Compute a climatology every N days by applying a mask and averaging function.
+Writes the averaged variable grid, attributes of the primary variable, and the coordinate arrays in a dictionary.
+***Assumption: This routine assumes that the N grids will fit in memory.***
+ '''
+ if averagingConfig['name'] == 'gaussInterp':
+ averagingConfig['wlat'] = nNeighbors
+ averagingConfig['wlon'] = nNeighbors
+ try:
+ averageFn = averagingFunctions[averager]
+ except:
+ print >>sys.stderr, 'climatology: Error, Averaging function must be one of: %s' % str(averagingFunctions)
+ sys.exit(1)
+
+ urlSplits = [s for s in splitFn(urls, nEpochs)]
+
+ if mode == 'multicore':
+ pass
+ elif mode == 'spark':
+ pass
+
+ def climsContoured(urls, plot=None, fillValue=default_fillvals['f4'], format='NETCDF4', cachePath=cachePath):
+ n = len(urls)
+ if VERBOSE: print >>sys.stderr, urls
+
+ var = climByAveraging(urls, variable, mask, coordinates, maskFn, averageFn, averagingConfig, optimization, cachePath)
+
+ fn = os.path.split(urls[0])[1]
+ inFile = os.path.join(cachePath, fn)
+ method = averagingConfig['name']
+ fn = os.path.splitext(fn)[0]
+ day = fn[5:8]
+ nDays = int(var['time'][0])
+
+ if 'wlat' in averagingConfig:
+ wlat = averagingConfig['wlat']
+ else:
+ wlat = 1
+ if int(wlat) == wlat:
+ outFile = 'A%s.L3m_%dday_clim_sst_4km_%s_%dnbrs.nc' % (day, nDays, method, int(wlat)) # mark each file with first day in period
+ else:
+ outFile = 'A%s.L3m_%dday_clim_sst_4km_%s_%4.2fnbrs.nc' % (day, nDays, method, wlat) # mark each file with first day in period
+
+ outFile = writeOutNetcdfVars(var, variable, mask, coordinates, inFile, outFile, fillValue, format)
+
+ if plot == 'contour':
+ figFile = contourMap(var, variable, coordinates, n, outFile)
+ elif plot == 'histogram':
+# figFile = histogram(var, variable, n, outFile)
+ figFile = None
+ else:
+ figFile = None
+ return (outFile, figFile)
+
+
+ return results
+
+
+def accumulateClim(urls, # list of granule URLs for a time period
+ variable, # name of primary variable in file
+ mask, # name of mask variable
+ coordinates, # names of coordinate arrays to read and pass on (e.g. 'lat' and 'lon')
+ maskFn=qcMask, # mask function to compute mask from mask variable
+ averageFn=mean, # averaging function to use
+ averagingConfig={}, # parameters to control averaging function (e.g. gaussInterp)
+ optimization='fortran', # optimization mode (fortran or cython)
+ cachePath=CachePath
+ ):
+ '''Compute a climatology over N arrays by applying a mask and averaging function.
+Returns the averaged variable grid, attributes of the primary variable, and the coordinate arrays in a dictionary.
+***Assumption: This routine assumes that the N grids will fit in memory.***
+ '''
+ print >>sys.stderr, 'accumulateClim: Doing %s ...' % averagingConfig['name']
+ varList = [variable, mask]
+ for i, url in enumerate(urls):
+ try:
+ path = retrieveFile(url, cachePath)
+ fn = os.path.split(path)[1]
+ vtime = int(fn[5:8]) # KLUDGE: extract DOY from filename
+ except:
+ print >>sys.stderr, 'climByAveraging: Error, continuing without file %s' % url
+ continue
+ if path is None: continue
+ try:
+ print >>sys.stderr, 'Reading file %s ...' % path
+ var, fh = getVariables(path, varList, arrayOnly=True, order='F', set_auto_mask=False) # return dict of variable objects by name
+ except:
+ print >>sys.stderr, 'climByAveraging: Error, cannot read file %s' % path
+ continue
+ if i == 0:
+ dtype = var[variable].dtype
+ if 'int' in dtype.name: dtype = N.float32
+ shape = var[variable].shape
+ vsum = N.ma.empty(shape, dtype, order='F')
+ vcount = N.ma.empty(shape, dtype, order='F')
+ emptyVar = N.array(N.ma.masked_all(var[variable].shape, dtype), order='F') # totally masked variable array for missing or bad file reads
+
+ print >>sys.stderr, 'Read coordinates ...'
+ var, fh = getVariables(path, coordinates, var, arrayOnly=True, order='F') # read coordinate arrays and add to dict
+
+ var[variable] = maskFn(var[variable], var[mask]) # apply quality mask variable to get numpy MA, turned off masking done by netCDF4 library
+
+ # Echo variable range for sanity check
+ vals = var[variable].compressed()
+ print >>sys.stderr, 'Variable Range: min, max:', vals.min(), vals.max()
+
+ if averagingConfig['name'] == 'pixelMean':
+ vsum += var[variable] # update accumulators
+ vcount += ~var[mask]
+
+ elif averagingConfig['name'] == 'gaussInterp':
+ var[variable] = accum
+ c = averagingConfig
+ latGrid = c['latGrid']; lonGrid = c['lonGrid']
+ if latGrid is not None and lonGrid is not None:
+ outlat = N.arange(latGrid[0], latGrid[1]+latGrid[2], latGrid[2], dtype=N.float32, order='F')
+ outlon = N.arange(lonGrid[0], lonGrid[1]+lonGrid[2], lonGrid[2], dtype=N.float32, order='F')
+ else:
+ outlat = N.array(var[coordinates[1]], dtype=N.float32, order='F')
+ outlon = N.array(var[coordinates[2]], dtype=N.float32, order='F')
+ varNames = [variable] + coordinates
+ start = time.time()
+ avg, weight, status = \
+ gaussInterp(var, varNames, outlat, outlon, c['wlat'], c['wlon'],
+ c['slat'], c['slon'], c['stime'], c['vfactor'], c['missingValue'],
+ VERBOSE, optimization)
+ end = time.time()
+ vcount = weight.astype(N.float32)
+ vsum = avg
+ print >>sys.stderr, 'gaussInterp execution time:', (end - start)
+ elif averagingConfig['name'] == 'spatialFilter':
+ var[variable] = accum
+ c = averagingConfig
+ varNames = [variable] + coordinates
+ start = time.time()
+ avg, vcount, status = \
+ spatialFilter(var, varNames, c['spatialFilter'], c['normalization'],
+ c['missingValue'], VERBOSE, optimization)
+ vsum = avg
+ end = time.time()
+ print >>sys.stderr, 'spatialFilter execution time:', (end - start)
+ close(fh)
+
+ return (vcount, vsum)
+
+
+def writeOutNetcdfVars(var, variable, mask, coordinates, inFile, outFile, fillValue=None, format='NETCDF4'):
+ '''Construct output bundle of arrays with NetCDF dimensions and attributes.
+Output variables and attributes will have same names as the input file.
+ '''
+ din = Dataset(inFile, 'r')
+ dout = Dataset(outFile, 'w', format=format)
+ print >>sys.stderr, 'Writing %s ...' % outFile
+
+ # Transfer global attributes from input file
+ for a in din.ncattrs():
+ dout.setncattr(a, din.getncattr(a))
+
+ # Add dimensions and variables, copying data
+ coordDim = [dout.createDimension(coord, var['out'+coord].shape[0]) for coord in coordinates] # here output lon, lat, etc.
+ for coord in coordinates:
+ v = dout.createVariable(coord, var['out'+coord].dtype, (coord,))
+ v[:] = var['out'+coord][:]
+ primaryVar = dout.createVariable(variable, var['out'+variable].dtype, coordinates, fill_value=fillValue)
+ primaryVar[:] = var['out'+variable][:] # transfer array
+ maskVar = dout.createVariable(mask, 'i1', coordinates)
+ maskVar[:] = var['out'+mask].astype('i1')[:]
+
+ # Transfer variable attributes from input file
+ for k,v in dout.variables.iteritems():
+ for a in din.variables[k].ncattrs():
+ if a == 'scale_factor' or a == 'add_offset' or a == '_FillValue': continue
+ v.setncattr(a, din.variables[k].getncattr(a))
+ if k == variable:
+ try:
+# if fillValue == None: fillValue = din.variables[k].getncattr('_FillValue') # total kludge
+ if fillValue == None: fillValue = default_fillvals['f4']
+# print >>sys.stderr, default_fillvals
+# v.setncattr('_FillValue', fillValue) # set proper _FillValue for climatology array
+ v.setncattr('missing_value', fillValue)
+ print >>sys.stderr, 'Setting missing_value for primary variable %s to %f' % (variable, fillValue)
+ except:
+ print >>sys.stderr, 'writeOutNetcdfVars: Warning, for variable %s no fill value specified or derivable from inputs.' % variable
+ din.close()
+ dout.close()
+ return outFile
+
+
+def contourMap(var, variable, coordinates, n, outFile):
+ figFile = os.path.splitext(outFile)[0] + '_hist.png'
+ # TODO: Downscale variable array (SST) before contouring, matplotlib is TOO slow on large arrays
+ vals = var[variable][:]
+
+ # Fixed color scale, write file, turn off auto borders, set title, reverse lat direction so monotonically increasing??
+ imageMap(var[coordinates[1]][:], var[coordinates[0]][:], var[variable][:],
+ vmin=-2., vmax=45., outFile=figFile, autoBorders=False,
+ title='%s %d-day Mean from %s' % (variable.upper(), n, outFile))
+ print >>sys.stderr, 'Writing contour plot to %s' % figFile
+ return figFile
+
+
+def histogram(vals, variable, n, outFile):
+ figFile = os.path.splitext(outFile)[0] + '_hist.png'
+ M.clf()
+# M.hist(vals, 47, (-2., 45.))
+ M.hist(vals, 94)
+ M.xlim(-5, 45)
+ M.xlabel('SST in degrees Celsius')
+ M.ylim(0, 300000)
+ M.ylabel('Count')
+ M.title('Histogram of %s %d-day Mean from %s' % (variable.upper(), n, outFile))
+ M.show()
+ print >>sys.stderr, 'Writing histogram plot to %s' % figFile
+ M.savefig(figFile)
+ return figFile
+
+
+def dailyFile2date(path, offset=1):
+ '''Convert YYYYDOY string in filename to date.'''
+ fn = os.path.split(path)[1]
+ year = int(fn[offset:offset+4])
+ doy = int(fn[offset+5:offset+8])
+ return fn[5:15].replace('.', '/')
+
+
+def formatRegion(r):
+ """Format lat/lon region specifier as string suitable for file name."""
+ if isinstance(r, str):
+ return r
+ else:
+ strs = [str(i).replace('-', 'm') for i in r]
+ return 'region-%s-%sby%s-%s' % tuple(strs)
+
+
+def formatGrid(r):
+ """Format lat/lon grid resolution specifier as string suitable for file name."""
+ if isinstance(r, str):
+ return r
+ else:
+ return str(r[0]) + 'by' + str(r[1])
+
+
+def main(args):
+ nEpochs = int(args[0])
+ nWindow = int(args[1])
+ nNeighbors = int(args[2])
+ averager = args[3]
+ optimization = args[4]
+ mode = args[5]
+ nWorkers = int(args[6])
+ urlFile = args[7]
+ urls = [s.strip() for s in open(urlFile, 'r')]
+ if averager == 'gaussInterp':
+ results = climByAveragingPeriods(urls, nEpochs, nWindow, nNeighbors, 'sst', 'qual_sst', ['lat', 'lon'],
+ averager=averager, optimization=optimization, averagingConfig=GaussInterpConfig,
+ mode=mode, nWorkers=nWorkers)
+ elif averager == 'spatialFilter':
+ results = climByAveragingPeriods(urls, nEpochs, nWindow, nNeighbors, 'sst', 'qual_sst', ['lat', 'lon'],
+ averager=averager, optimization=optimization, averagingConfig=SpatialFilterConfig1,
+ mode=mode, nWorkers=nWorkers)
+ elif averager == 'pixelMean':
+ results = climByAveragingPeriods(urls, nEpochs, nWindow, nNeighbors, 'sst', 'qual_sst', ['lat', 'lon'],
+ averager=averager, optimization=optimization, averagingConfig=PixelMeanConfig,
+ mode=mode, nWorkers=nWorkers)
+ else:
+ print >>sys.stderr, 'climatology2: Error, averager must be one of', AveragingFunctions.keys()
+ sys.exit(1)
+
+ if results[0][1] is not None:
+ makeMovie([r[1] for r in results], 'clim.mpg')
+ return results
+
+
+if __name__ == '__main__':
+ print main(sys.argv[1:])
+
+
+# Old Tests:
+# python climatology2.py 5 5 0 pixelMean fortran sequential 1 urls_sst_10days.txt
+# python climatology2.py 5 5 3 gaussInterp fortran sequential 1 urls_sst_10days.txt
+
+# python climatology2.py 5 5 0 pixelMean fortran sequential 1 urls_sst_40days.txt
+# python climatology2.py 5 5 0 pixelMean fortran multicore 8 urls_sst_40days.txt
+# python climatology2.py 5 5 3 gaussInterp fortran multicore 8 urls_sst_40days.txt
+
+# Old Production:
+# python climatology2.py 5 5 0 pixelMean fortran multicore 16 urls_sst_2015.txt >& log &
+# python climatology2.py 10 10 0 pixelMean fortran multicore 16 urls_sst_2015.txt >& log &
+# python climatology2.py 5 5 3 gaussInterp fortran multicore 16 urls_sst_2015.txt >& log &
+
+
+# Tests:
+# python climatology2.py 5 5 0 pixelMean fortran sequential 1 urls_sst_daynight_5days_sorted.txt
+# python climatology2.py 5 5 0 pixelMean fortran multicore 4 urls_sst_daynight_5days_sorted.txt
+# python climatology2.py 5 5 3 gaussInterp fortran sequential 1 urls_sst_daynight_5days_sorted.txt
+# python climatology2.py 5 5 3 gaussInterp fortran multicore 4 urls_sst_daynight_5days_sorted.txt
+# python climatology2.py 5 7 1 spatialFilter fortran sequential 1 urls_sst_daynight_5days_sorted.txt
+
+# Test number of neighbors needed:
+# python climatology2.py 5 7 3 gaussInterp fortran multicore 4 urls_sst_daynight_20days_sorted.txt
+
+
+# Production:
+# python climatology2.py 5 7 0 pixelMean fortran multicore 4 urls_sst_daynight_2003_2015_sorted.txt
+# python climatology2.py 5 7 3 gaussInterp fortran sequential 1 urls_sst_daynight_2003_2015_sorted.txt
+# python climatology2.py 5 7 3 gaussInterp fortran multicore 4 urls_sst_daynight_2003_2015_sorted.txt
+# python climatology2.py 5 7 1 spatialFilter fortran multicore 4 urls_sst_daynight_2003_2015_sorted.txt
+
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/climatology/clim/cluster.py
----------------------------------------------------------------------
diff --git a/climatology/clim/cluster.py b/climatology/clim/cluster.py
new file mode 100644
index 0000000..7a76ad9
--- /dev/null
+++ b/climatology/clim/cluster.py
@@ -0,0 +1,84 @@
+#
+# cluster.py -- Parallel map function that uses a distributed cluster and multicore within each node
+#
+
+import os, sys
+from split import fixedSplit
+import pp
+#import dispy
+
+Servers = ("server-1", "server-2", "server-3", "server-4")
+NCores = 8
+
+
+def splitSeq(seq, n):
+ '''Split a sequence into N almost even parts.'''
+ m = int(len(seq)/n)
+ return fixedSplit(seq, m)
+
+
+def dmap(func, # function to parallel map across split sequence
+ seq, # sequence of data
+ nCores, # number of cores to use on each node
+ servers=[], # list of servers in the cluster
+ splitterFn=splitSeq, # function to use to split the sequence of data
+ ):
+ '''Parallel operations for a cluster of nodes, each with multiple cores.
+ '''
+ nServers = len(servers)
+ if nServers == 0: servers = ('localhost',)
+ jobServer = pp.Server(nCores, ppservers=servers)
+ splits = splitSeq(seq, nServers * nCores)
+ jobs = [(s, jobServer.submit(func, (s,), ('splitSeq',), globals=globals())) for s in splits]
+ results = [(j[0], j[1]()) for j in jobs]
+ return results
+
+
+class PPCluster:
+ '''Parallel operations for a cluster of nodes, each with multiple cores.
+ '''
+ def __init__(self, nCores=NCores, # number of cores to use on each node
+ servers=Servers, # list of servers in the cluster
+ splitterFn=splitSeq, # function to use to split the sequence of data
+ ):
+ self.nCores = nCores
+ self.servers = servers
+ self.nServers = len(servers)
+ self.splitterFn = splitterFn
+ self.jobServer = pp.Server(nCores, ppservers=servers)
+
+ def dmap(self, func, seq):
+ '''Distributed map function that automatically uses a cluster of nodes and a set number of cores.
+A sequence of data is split across the cluster.
+ '''
+ splits = splitSeq(seq, self.nServers * self.nCores)
+ jobs = [(s, self.jobServer.submit(func, (s,), (splitterFn,), globals=globals())) for s in splits]
+ results = [(j[0], j[1]()) for j in jobs]
+ return results
+
+
+# Tests follow.
+
+def extractDay(url, substring):
+ '''Extract integer DOY from filename like urls.'''
+ i = url.find(substring)
+ return int(url[5:8])
+
+
+def main(args):
+ nCores = int(args[0])
+ servers = tuple(eval(args[1]))
+ urlFile = args[2]
+ urls = [s.strip() for s in open(urlFile, 'r')]
+
+# results = PPCluster(nCores, servers).dmap(lambda u: extractDOY(u, 'A2015'), urls)
+ results = dmap(lambda u: extractDOY(u, 'A2015'), urls, nCores, servers, splitSeq)
+
+
+if __name__ == '__main__':
+ print main(sys.argv[1:])
+
+# python cluster.py 8 '["deepdata-1"]' urls_sst_2015.txt
+# python cluster.py 1 '["deepdata-1", "deepdata-2", "deepdata-3", "deepdata-4"]' urls_sst_2015.txt
+# python cluster.py 8 '["deepdata-1", "deepdata-2", "deepdata-3", "deepdata-4"]' urls_sst_2015.txt
+
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/climatology/clim/cluster2.py
----------------------------------------------------------------------
diff --git a/climatology/clim/cluster2.py b/climatology/clim/cluster2.py
new file mode 100644
index 0000000..7a76ad9
--- /dev/null
+++ b/climatology/clim/cluster2.py
@@ -0,0 +1,84 @@
+#
+# cluster.py -- Parallel map function that uses a distributed cluster and multicore within each node
+#
+
+import os, sys
+from split import fixedSplit
+import pp
+#import dispy
+
+Servers = ("server-1", "server-2", "server-3", "server-4")
+NCores = 8
+
+
+def splitSeq(seq, n):
+ '''Split a sequence into N almost even parts.'''
+ m = int(len(seq)/n)
+ return fixedSplit(seq, m)
+
+
+def dmap(func, # function to parallel map across split sequence
+ seq, # sequence of data
+ nCores, # number of cores to use on each node
+ servers=[], # list of servers in the cluster
+ splitterFn=splitSeq, # function to use to split the sequence of data
+ ):
+ '''Parallel operations for a cluster of nodes, each with multiple cores.
+ '''
+ nServers = len(servers)
+ if nServers == 0: servers = ('localhost',)
+ jobServer = pp.Server(nCores, ppservers=servers)
+ splits = splitSeq(seq, nServers * nCores)
+ jobs = [(s, jobServer.submit(func, (s,), ('splitSeq',), globals=globals())) for s in splits]
+ results = [(j[0], j[1]()) for j in jobs]
+ return results
+
+
+class PPCluster:
+ '''Parallel operations for a cluster of nodes, each with multiple cores.
+ '''
+ def __init__(self, nCores=NCores, # number of cores to use on each node
+ servers=Servers, # list of servers in the cluster
+ splitterFn=splitSeq, # function to use to split the sequence of data
+ ):
+ self.nCores = nCores
+ self.servers = servers
+ self.nServers = len(servers)
+ self.splitterFn = splitterFn
+ self.jobServer = pp.Server(nCores, ppservers=servers)
+
+ def dmap(self, func, seq):
+ '''Distributed map function that automatically uses a cluster of nodes and a set number of cores.
+A sequence of data is split across the cluster.
+ '''
+ splits = splitSeq(seq, self.nServers * self.nCores)
+ jobs = [(s, self.jobServer.submit(func, (s,), (splitterFn,), globals=globals())) for s in splits]
+ results = [(j[0], j[1]()) for j in jobs]
+ return results
+
+
+# Tests follow.
+
+def extractDay(url, substring):
+ '''Extract integer DOY from filename like urls.'''
+ i = url.find(substring)
+ return int(url[5:8])
+
+
+def main(args):
+ nCores = int(args[0])
+ servers = tuple(eval(args[1]))
+ urlFile = args[2]
+ urls = [s.strip() for s in open(urlFile, 'r')]
+
+# results = PPCluster(nCores, servers).dmap(lambda u: extractDOY(u, 'A2015'), urls)
+ results = dmap(lambda u: extractDOY(u, 'A2015'), urls, nCores, servers, splitSeq)
+
+
+if __name__ == '__main__':
+ print main(sys.argv[1:])
+
+# python cluster.py 8 '["deepdata-1"]' urls_sst_2015.txt
+# python cluster.py 1 '["deepdata-1", "deepdata-2", "deepdata-3", "deepdata-4"]' urls_sst_2015.txt
+# python cluster.py 8 '["deepdata-1", "deepdata-2", "deepdata-3", "deepdata-4"]' urls_sst_2015.txt
+
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/climatology/clim/datasets.py
----------------------------------------------------------------------
diff --git a/climatology/clim/datasets.py b/climatology/clim/datasets.py
new file mode 100644
index 0000000..226935b
--- /dev/null
+++ b/climatology/clim/datasets.py
@@ -0,0 +1,317 @@
+'''
+datasets.py -- Routines for dataset-specfic capabilities: file handling, readers, etc.
+
+One Class for each dataset containing static methods and constants/templates, etc.
+
+'''
+
+import sys, os, re, datetime
+
+import numpy as np
+
+from split import splitByNDaysKeyed, groupByKeys, extractKeys
+
+
+def splitModisAod(seq, n):
+ return splitByNDaysKeyed(seq, n, re.compile(r'(....)(..)(..)'), lambda y, m, d: ymd2doy(y, m, d))
+
+
+def splitAvhrrSst(seq, n):
+ return splitByNDays_Avhrr(seq, n, re.compile(r'^(....)(..)(..)'))
+
+
+class ModisSst:
+ ExpectedRunTime = "28m"
+ UrlsPath = "/data/share/datasets/MODIS_L3_AQUA_11UM_V2014.0_4KM_DAILY/daily_data/A*SST*.nc"
+ ExampleFileName = 'A2010303.L3m_DAY_NSST_sst_4km.nc'
+ GetKeysRegex = r'A(....)(...).L3m_DAY_(.)S'
+
+ VariableName = 'sst'
+ Mask = None
+ Coordinates = ['lat', 'lon']
+
+ OutputClimTemplate = ''
+
+ @staticmethod
+ def keysTransformer(s): return (s[1], s[0], s[2]) # DOY, YEAR, N=night / S=day
+
+ @staticmethod
+ def getKeys(url):
+ return extractKeys(url, ModisSst.GetKeysRegex, ModisSst.keysTransformer)
+
+ @staticmethod
+ def split(seq, n):
+ return [u for u in splitByNDaysKeyed(seq, n, ModisSst.GetKeysRegex, ModisSst.keysTransformer)]
+
+ @staticmethod
+ def genOutputName(doy, variable, nEpochs, averagingConfig):
+ return 'A%03d.L3m_%s_%dday_clim_%s.nc' % (
+ doy, variable, nEpochs, averagingConfig['name']) # mark each file with first day in period
+
+
+class ModisChlor:
+ ExpectedRunTime = "11m"
+ UrlsPath = "/Users/greguska/githubprojects/nexus/nexus-ingest/developer-box/data/modis_aqua_chl/A*chlor*.nc"
+ ExampleFileName = "A2013187.L3m_DAY_CHL_chlor_a_4km.nc"
+ GetKeysRegex = r'A(....)(...).L3m.*CHL'
+
+ Variable = 'chlor_a'
+ Mask = None
+ Coordinates = ['lat', 'lon']
+
+ OutputClimTemplate = ''
+
+ @staticmethod
+ def keysTransformer(s): return (s[1], s[0]) # DOY, YEAR
+
+ @staticmethod
+ def getKeys(url):
+ return extractKeys(url, ModisChlor.GetKeysRegex, ModisChlor.keysTransformer)
+
+ @staticmethod
+ def split(seq, n):
+ return [u for u in splitByNDaysKeyed(seq, n, ModisChlor.GetKeysRegex, ModisChlor.keysTransformer)]
+
+ @staticmethod
+ def genOutputName(doy, variable, nEpochs, averagingConfig):
+ return 'A%03d.L3m_%s_%dday_clim_%s.nc' % (
+ doy, variable, nEpochs, averagingConfig['name']) # mark each file with first day in period
+
+
+class MeasuresSsh:
+ ExpectedRunTime = "2m22s"
+ UrlsPath = "/data/share/datasets/MEASURES_SLA_JPL_1603/daily_data/ssh_grids_v1609*12.nc"
+ ExampleFileName = "ssh_grids_v1609_2006120812.nc"
+ GetKeysRegex = r'ssh.*v1609_(....)(..)(..)12.nc'
+
+ Variable = 'SLA' # sea level anomaly estimate
+ Mask = None
+ Coordinates = ['Longitude', 'Latitude'] # Time is first (len=1) coordinate, will be removed
+
+ OutputClimTemplate = ''
+
+ @staticmethod
+ def keysTransformer(s): return (ymd2doy(s[0], s[1], s[2]), s[0]) # DOY, YEAR
+
+ @staticmethod
+ def getKeys(url):
+ return extractKeys(url, MeasuresSsh.GetKeysRegex, MeasuresSsh.keysTransformer)
+
+ @staticmethod
+ def split(seq, n):
+ return [u for u in splitByNDaysKeyed(seq, n, MeasuresSsh.GetKeysRegex, MeasuresSsh.keysTransformer)]
+
+ @staticmethod
+ def genOutputName(doy, variable, nEpochs, averagingConfig):
+ return "ssh_grids_v1609_%03d_%dday_clim_%s.nc" % (int(doy), nEpochs, averagingConfig['name'])
+
+
+class CCMPWind:
+ ExpectedRunTime = "?"
+ UrlsPath = "/data/share/datasets/CCMP_V2.0_L3.0/daily_data/CCMP_Wind*_V02.0_L3.0_RSS_uncompressed.nc"
+ ExampleFileName = "CCMP_Wind_Analysis_20160522_V02.0_L3.0_RSS_uncompressed.nc"
+ GetKeysRegex = r'CCMP_Wind_Analysis_(....)(..)(..)_V.*.nc'
+
+ Variable = 'Wind_Magnitude' # to be computed as sqrt(uwnd^2 + vwnd^2)
+ Mask = None
+ Coordinates = ['latitude', 'longitude']
+
+ OutputClimTemplate = ''
+
+ @staticmethod
+ def keysTransformer(s):
+ return (ymd2doy(s[0], s[1], s[2]), s[0]) # DOY, YEAR
+
+ @staticmethod
+ def getKeys(url):
+ return extractKeys(url, CCMPWind.GetKeysRegex, CCMPWind.keysTransformer)
+
+ @staticmethod
+ def split(seq, n):
+ return [u for u in splitByNDaysKeyed(seq, n, CCMPWind.GetKeysRegex, CCMPWind.keysTransformer)]
+
+ @staticmethod
+ def genOutputName(doy, variable, nEpochs, averagingConfig):
+ return "CCMP_Wind_Analysis_V02.0_L3.0_RSS_%03d_%dday_clim_%s.nc" % (int(doy), nEpochs, averagingConfig['name'])
+
+ @staticmethod
+ def readAndMask(url, variable, mask=None, cachePath='/tmp/cache', hdfsPath=None):
+ """
+ Read a variable from a netCDF or HDF file and return a numpy masked array.
+ If the URL is remote or HDFS, first retrieve the file into a cache directory.
+ """
+ from variables import getVariables, close
+ v = None
+ if mask:
+ variables = [variable, mask]
+ else:
+ variables = [variable]
+ try:
+ from cache import retrieveFile
+ path = retrieveFile(url, cachePath, hdfsPath)
+ except:
+ print >> sys.stderr, 'readAndMask: Error, continuing without file %s' % url
+ return v
+
+ if CCMPWind.Variable in variables:
+ var, fh = getVariables(path, ['uwnd','vwnd'], arrayOnly=True,
+ set_auto_mask=True) # return dict of variable objects by name
+ uwnd_avg = np.average(var['uwnd'], axis=0)
+ vwnd_avg = np.average(var['vwnd'], axis=0)
+ wind_magnitude = np.sqrt(np.add(np.multiply(uwnd_avg, uwnd_avg), np.multiply(vwnd_avg, vwnd_avg)))
+ v = wind_magnitude
+ if v.shape[0] == 1: v = v[0] # throw away trivial time dimension for CF-style files
+ close(fh)
+ else:
+ try:
+ print >> sys.stderr, 'Reading variable %s from %s' % (variable, path)
+ var, fh = getVariables(path, variables, arrayOnly=True,
+ set_auto_mask=True) # return dict of variable objects by name
+ v = var[
+ variable] # could be masked array
+ if v.shape[0] == 1: v = v[0] # throw away trivial time dimension for CF-style files
+ close(fh)
+ except:
+ print >> sys.stderr, 'readAndMask: Error, cannot read variable %s from file %s' % (variable, path)
+
+ return v
+
+class MonthlyClimDataset:
+ ExpectedRunTime = "2m"
+ UrlsPath = ''
+ ExampleFileName = ''
+ GetKeysRegex = r'(YYYY)(MM)(DD)' # Regex to extract year, month, day
+ Variable = 'var' # Variable name in granule
+ Mask = None
+ Coordinates = ['lat', 'lon']
+ OutputClimTemplate = ''
+
+ @staticmethod
+ def keysTransformer(s):
+ return (s[1],s[0]) # MONTH, YEAR
+
+ @staticmethod
+ def getKeys(url):
+ return extractKeys(url, MonthlyClimDataset.GetKeysRegex,
+ MonthlyClimDataset.keysTransformer)
+
+ @staticmethod
+ def split(seq, n):
+ return [u for u in splitByNDaysKeyed(seq, n,
+ MonthlyClimDataset.GetKeysRegex,
+ MonthlyClimDataset.keysTransformer)]
+
+ @staticmethod
+ def genOutputName(month, variable, nEpochs, averagingConfig):
+ # Here we use the 15th of the month to get DOY and just use any
+ # non-leap year.
+ doy = datetime2doy(ymd2datetime(2017, month, 15))
+ return 'monthly_clim_%s_%03d_month%02d_nepochs%d_%s.nc' % (
+ variable, doy, month, nEpochs,
+ averagingConfig['name']) # mark each file with month
+
+
+class SMAP_L3M_SSS(MonthlyClimDataset):
+ UrlsPath = "/data/share/datasets/SMAP_L3_SSS/monthly/RSS_smap_SSS_monthly_*.nc"
+ ExampleFileName = 'RSS_smap_SSS_monthly_2015_04_v02.0.nc'
+ GetKeysRegex = r'RSS_smap_SSS_monthly_(....)_(..)_v02'
+ Variable = 'sss_smap'
+
+ @staticmethod
+ def getKeys(url):
+ return extractKeys(url, SMAP_L3M_SSS.GetKeysRegex,
+ SMAP_L3M_SSS.keysTransformer)
+
+ @staticmethod
+ def split(seq, n):
+ return [u for u in splitByNDaysKeyed(seq, n,
+ SMAP_L3M_SSS.GetKeysRegex,
+ SMAP_L3M_SSS.keysTransformer)]
+
+ @staticmethod
+ def genOutputName(month, variable, nEpochs, averagingConfig):
+ # Here we use the 15th of the month to get DOY and just use any
+ # non-leap year.
+ doy = datetime2doy(ymd2datetime(2017, month, 15))
+ return '%s_L3m_clim_doy%03d_month%02d_nepochs%d_%s.nc' % (
+ variable, doy, month, nEpochs,
+ averagingConfig['name']) # mark each file with month
+
+class GRACE_Tellus(MonthlyClimDataset):
+ GetKeysRegex = r'GRCTellus.JPL.(....)(..)(..).GLO'
+ Variable = 'lwe_thickness' # Liquid_Water_Equivalent_Thickness
+
+ @staticmethod
+ def getKeys(url):
+ return extractKeys(url, GRACE_Tellus.GetKeysRegex,
+ GRACE_Tellus.keysTransformer)
+
+ @staticmethod
+ def split(seq, n):
+ return [u for u in splitByNDaysKeyed(seq, n,
+ GRACE_Tellus.GetKeysRegex,
+ GRACE_Tellus.keysTransformer)]
+
+ @staticmethod
+ def genOutputName(month, variable, nEpochs, averagingConfig):
+ # Here we use the 15th of the month to get DOY and just use any
+ # non-leap year.
+ doy = datetime2doy(ymd2datetime(2017, month, 15))
+ return 'GRACE_Tellus_monthly_%s_%03d_month%02d_nepochs%d_%s.nc' % (
+ variable, doy, month, nEpochs,
+ averagingConfig['name']) # mark each file with month
+
+class GRACE_Tellus_monthly_land(GRACE_Tellus):
+ UrlsPath = "/data/share/datasets/GRACE_Tellus/monthly_land/GRCTellus.JPL.*.nc"
+ ExampleFileName = "GRCTellus.JPL.20150122.GLO.RL05M_1.MSCNv02CRIv02.land.nc"
+
+ @staticmethod
+ def genOutputName(month, variable, nEpochs, averagingConfig):
+ # Here we use the 15th of the month to get DOY and just use any
+ # non-leap year.
+ doy = datetime2doy(ymd2datetime(2017, month, 15))
+ return 'GRACE_Tellus_monthly_land_%s_%03d_month%02d_nepochs%d_%s.nc' % (
+ variable, doy, month, nEpochs,
+ averagingConfig['name']) # mark each file with month
+
+class GRACE_Tellus_monthly_ocean(GRACE_Tellus):
+ UrlsPath = "/data/share/datasets/GRACE_Tellus/monthly_ocean/GRCTellus.JPL.*.nc"
+ ExampleFileName = "GRCTellus.JPL.20150122.GLO.RL05M_1.MSCNv02CRIv02.ocean.nc"
+
+ @staticmethod
+ def genOutputName(month, variable, nEpochs, averagingConfig):
+ # Here we use the 15th of the month to get DOY and just use any
+ # non-leap year.
+ doy = datetime2doy(ymd2datetime(2017, month, 15))
+ return 'GRACE_Tellus_monthly_ocean_%s_%03d_month%02d_nepochs%d_%s.nc'%(
+ variable, doy, month, nEpochs,
+ averagingConfig['name']) # mark each file with month
+
+
+DatasetList = {'ModisSst': ModisSst, 'ModisChlor': ModisChlor,
+ 'MeasuresSsh': MeasuresSsh, 'CCMPWind': CCMPWind,
+ 'SMAP_L3M_SSS': SMAP_L3M_SSS,
+ 'GRACE_Tellus_monthly_ocean': GRACE_Tellus_monthly_ocean,
+ 'GRACE_Tellus_monthly_land': GRACE_Tellus_monthly_land}
+
+
+# Utils follow.
+def ymd2doy(year, mon, day):
+ return datetime2doy(ymd2datetime(year, mon, day))
+
+
+def ymd2datetime(y, m, d):
+ y, m, d = map(int, (y, m, d))
+ return datetime.datetime(y, m, d)
+
+
+def datetime2doy(dt):
+ return int(dt.strftime('%j'))
+
+
+def doy2datetime(year, doy):
+ '''Convert year and DOY (day of year) to datetime object.'''
+ return datetime.datetime(int(year), 1, 1) + datetime.timedelta(int(doy) - 1)
+
+
+def doy2month(year, doy): return doy2datetime(year, doy).strftime('%m')
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/climatology/clim/dparkTest.py
----------------------------------------------------------------------
diff --git a/climatology/clim/dparkTest.py b/climatology/clim/dparkTest.py
new file mode 100644
index 0000000..be92cbd
--- /dev/null
+++ b/climatology/clim/dparkTest.py
@@ -0,0 +1,9 @@
+
+import dpark
+
+lines = dpark.textFile("/usr/share/dict/words", 128)
+#lines = dpark.textFile("/usr/share/doc/gcc-4.4.7/README.Portability", 128)
+words = lines.flatMap(lambda x:x.split()).map(lambda x:(x,1))
+wc = words.reduceByKey(lambda x,y:x+y).collectAsMap()
+print wc[wc.keys[0]]
+
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/climatology/clim/gaussInterp.py
----------------------------------------------------------------------
diff --git a/climatology/clim/gaussInterp.py b/climatology/clim/gaussInterp.py
new file mode 100644
index 0000000..f694d7a
--- /dev/null
+++ b/climatology/clim/gaussInterp.py
@@ -0,0 +1,43 @@
+#
+# gaussInterp routine -- Gaussian weighted smoothing in lat, lon, and time
+#
+# Based on Ed Armstrong's routines.
+#
+# Calls into optimized routines in Fortran or cython.
+# See gaussInterp_f.f or gaussInterp.pyx
+#
+
+import numpy as N, time
+from gaussInterp_f import gaussinterp_f
+
+
+def gaussInterp(var, # bundle of input arrays: masked variable, coordinates
+ varNames, # list of names in order: primary variable, coordinates in order lat, lon, time
+ outlat, outlon, # output lat/lon coordinate vectors
+ wlat, wlon, # window of lat/lon neighbors to gaussian weight, expressed in delta lat (degrees)
+ slat, slon, stime, # sigma for gaussian downweighting with distance in lat, lon (deg), & time (days)
+ vfactor=-0.6931, # factor in front of gaussian expression
+ missingValue=-9999., # value to mark missing values in interp result
+ verbose=1, # integer to set verbosity level
+ optimization='fortran'): # Mode of optimization, using 'fortran' or 'cython'
+ '''Gaussian interpolate in lat, lon, and time to a different lat/lon grid, and over a time window to the center time.
+Bundle of arrays (var) contains a 3D masked variable and coordinate arrays for lat, lon, and time read from netdf/hdf files.
+Returns the 2D interpolated variable (masked) and a status for failures.
+ '''
+ v = var[varNames[0]][:] # real*4 in Fortran code, is v.dtype correct?
+ vmask = N.ma.getmask(v).astype('int8')[:] # integer*1, convert bool mask to one-byte integer for Fortran
+ vtime = var[varNames[1]][:] # integer*4 in Fortran
+ lat = var[varNames[2]][:] # real*4
+ lon = var[varNames[3]][:] # real*4
+ if optimization == 'fortran':
+ vinterp, vweight, status = \
+ gaussinterp_f(v, vmask, vtime, lat, lon,
+ outlat, outlon, wlat, wlon, slat, slon, stime, vfactor, missingValue, verbose)
+ else:
+ vinterp, vweight, status = \
+ gaussInterp_(v, vmask, vtime, lat, lon,
+ outlat, outlon, wlat, wlon, slat, slon, stime, vfactor, missingValue, verbose)
+
+ vinterp = N.ma.masked_where(vweight == 0.0, vinterp)
+ return (vinterp, vweight, status)
+
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/climatology/clim/gaussInterp.pyx
----------------------------------------------------------------------
diff --git a/climatology/clim/gaussInterp.pyx b/climatology/clim/gaussInterp.pyx
new file mode 100644
index 0000000..943e298
--- /dev/null
+++ b/climatology/clim/gaussInterp.pyx
@@ -0,0 +1,131 @@
+c
+c gaussInterp routine -- Gaussian weighted smoothing in lat, lon, and time
+c
+c Based on Ed Armstrong's routines. Designed to be called from python using f2py.
+c
+c
+c Gaussian weighting = exp( vfactor * (((x - x0)/sx)^2 + ((y - y0)/sy)^2 + ((t - t0)/st)^2 ))
+c
+c where deltas are distances in lat, lon and time and sx, sy, st are one e-folding sigmas.
+c
+c Cutoffs for neighbors allowed in the interpolation are set by distance in lat/lon (see dlat/dlon);
+c for time all epochs are included.
+c
+
+import sys
+import numpy as np
+cimport numpy as np
+from math import exp
+
+from gaussInterp_f import gaussInterp_f
+
+Var_t = np.float
+ctypedef np.float_t Var_t
+
+VERBOSE = 1
+
+
+def gaussInterp(var, # bundle of input arrays: masked variable, coordinates
+ varNames, # list of names in order: primary variable, coordinates in order lat, lon, time
+ outlat, outlon, # output lat/lon coordinate vectors
+ wlat, wlon, # window of lat/lon neighbors to gaussian weight, expressed in delta lat (degrees)
+ slat, slon, stime, # sigma for gaussian downweighting with distance in lat, lon (deg), & time (days)
+ vfactor=-0.6931, # factor in front of gaussian expression
+ missingValue=-9999., # value to mark missing values in interp result
+ verbose=VERBOSE, # integer to set verbosity level
+ optimization='cython'): # Mode of optimization, using 'fortran' or 'cython'
+ '''Gaussian interpolate in lat, lon, and time to a different lat/lon grid, and over a time window to the center time.
+Bundle of arrays (var) contains a 3D masked variable and coordinate arrays for lat, lon, and time read from netdf/hdf files.
+Returns the 2D interpolated variable (masked) and a status for failures.
+ '''
+ v = var[varNames[0]][:]
+ vmask = np.ma.getmask(v)[:]
+ vtime = var[varNames[1]][:]
+ lat = var[varNames[2]][:]
+ lon = var[varNames[3]][:]
+ if optimization == 'fortran':
+ vinterp, vweight, status =
+ gaussInterp_f(v, vmask, vtime, lat, lon,
+ outlat, outlon, wlat, wlon, slat, slon, stime, vfactor, missingValue)
+ else:
+ vinterp, vweight, status =
+ gaussInterp_(v, vmask, vtime, lat, lon,
+ outlat, outlon, wlat, wlon, slat, slon, stime, vfactor, missingValue)
+ vinterp = np.ma.array(vinterp, mask=np.ma.make_mask(vweight)) # apply mask
+ return (vinterp, vweight, status)
+
+
+#@cython.boundscheck(False)
+def gaussInterp_(np.ndarray[Var_t, ndim=3] var, # variable & mask arrays with dimensions of lat,lon,time
+ np.ndarray[Var_t, ndim=3] vmask,
+ np.ndarray[np.int_t, ndim=1] vtime,
+ np.ndarray[np.float_t, ndim=1] lat,
+ np.ndarray[np.float_t, ndim=1] lon, # coordinate vectors for inputs
+ np.ndarray[np.float_t, ndim=1] outlat, # coordinate vectors for grid to interpolate to
+ np.ndarray[np.float_t, ndim=1] outlon,
+ float wlat, float wlon, # window of lat/lon neighbors to gaussian weight, expressed in delta lat (degrees)
+ float slat, float slon, float stime, # sigma for gaussian downweighting with distance in lat, lon (deg), & time (days)
+ float vfactor, # factor in front of gaussian expression
+ float missingValue): # value to mark missing values in interp result
+ '''Gaussian interpolate in lat, lon, and time to a different lat/lon grid, and over a time window to the center time.
+Returns the 2D interpolated variable (masked), the weight array, and a status for failures.
+ '''
+ assert var.dtype == Var_t and mask.dtype == np.int
+
+ cdef np.ndarray[Var_t, ndim=2] vinterp = np.zeros( (outlat.shape[0], outlon.shape[0]), dtype=Var_t ) # interpolated variable, missing values not counted
+ cdef np.ndarray[Var_t, ndim=2] vweight = np.zeros( (outlat.shape[0], outlon.shape[0]), dtype=Var_t ) # weight of values interpolated (can be zero)
+ cdef int status = 0 # negative status indicates error
+
+ cdef int imin, imax, jmin, jmax
+ cdef int iin, jin, kin
+ cdef int i, j
+
+ cdef int ntime = time.shape[0]
+ cdef int nlat = lat.shape[0]
+ cdef int nlon = lon.shape[0]
+
+ cdef int noutlat = outlat.shape[0]
+ cdef int noutlon = outlon.shape[0]
+
+ cdef float wlat2 = wlat / 2.
+ cdef float wlon2 = wlon / 2.
+ cdef float lat0 = lat[0]
+ cdef float lon0 = lon[0]
+ cdef float dlat = lat[1] - lat[0]
+ cdef float dlon = lon[1] - lon[0]
+ cdef double midTime = time[int(ntime/2 + 0.5)]
+
+ for i xrange(noutlat):
+ print >>sys.stderr, outlat[i]
+ for j in xrange(noutlon):
+ imin = clamp(int((outlat[i] - wlat2 - lat0)/dlat + 0.5), 0, nlat-1)
+ imax = clamp(int((outlat[i] + wlat2 - lat0)/dlat + 0.5), 0, nlat-1)
+ jmin = clamp(int((outlon[j] - wlon2 - lon0)/dlon + 0.5), 0, nlon-1)
+ jmax = clamp(int((outlon[j] + wlon2 - lon0)/dlon + 0.5), 0, nlon-1)
+
+ for kin in xrange(ntime):
+ for iin in xrange(imin, imax+1):
+ for jin in xrange(jmin, jmax+1):
+ if not vmask[kin, iin, jin]:
+ fac = exp( vfactor *
+ (((outlat[i] - lat[iin])/slat)**2
+ + ((outlon[j] - lon[jin])/slon)**2
+ + ((midTime - vtime[kin])/stime)**2))
+ val = var[kin, iin, jin]
+ if VERBOSE > 1: print >>sys.stderr, kin, iin, jin, vtime[kin], lat[iin], lon[jin], val, fac, val*fac
+
+ vinterp[i,j] = vinterp[i,j] + val * fac
+ vweight[i,j] = vweight[i,j] + fac
+
+ if vweight[i,j] != 0.0:
+ vinterp[i,j] = vinterp[i,j] / vweight[i,j]
+ else:
+ vinterp[i,j] = missingValue
+
+ return (vinterp, vweight, status)
+
+
+cdef int clamp(int i, int n, int m):
+ if i < n: return n
+ if i > m: return m
+ return i