You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@vxquery.apache.org by pr...@apache.org on 2013/11/17 17:16:45 UTC
svn commit: r1542755 - in
/incubator/vxquery/trunk/vxquery/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts:
weather_cli.py weather_convert_to_xml.py weather_data_files.py
Author: prestonc
Date: Sun Nov 17 16:16:44 2013
New Revision: 1542755
URL: http://svn.apache.org/r1542755
Log:
Added support for creating separate sensor and station XML files. The system has also been stream lined in the CLI to make it a little simpler. Also added support for partition under the new method of standard folder names over customization. Helps with automating the script.
Modified:
incubator/vxquery/trunk/vxquery/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py
incubator/vxquery/trunk/vxquery/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_convert_to_xml.py
incubator/vxquery/trunk/vxquery/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py
Modified: incubator/vxquery/trunk/vxquery/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py
URL: http://svn.apache.org/viewvc/incubator/vxquery/trunk/vxquery/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py?rev=1542755&r1=1542754&r2=1542755&view=diff
==============================================================================
--- incubator/vxquery/trunk/vxquery/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py (original)
+++ incubator/vxquery/trunk/vxquery/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py Sun Nov 17 16:16:44 2013
@@ -30,24 +30,18 @@ COMPRESSED = False
# http://www1.ncdc.noaa.gov/pub/data/ghcn/daily/readme.txt
#
def main(argv):
- append = True
- download_reset = False
max_records = 0
- no_data_processing = False
- nodes = 0
package = "ghcnd_gsn"
- partition_stations = False
partitions = 0
- print_stats = False
process_file_name = ""
reset = False
save_path = "/tmp"
- update = False
- web_service_download = False
+ section = "all"
token = ""
+ update = False
try:
- opts, args = getopt.getopt(argv, "a:cdf:lhm:no:p:s:truw:", ["max_station_files=", "file=", "save_directory=", "package=", "partitions=", "nodes=", "web_service="])
+ opts, args = getopt.getopt(argv, "a:cf:hl:m:p:rs:uvw:", ["max_station_files=", "file=", "locality=", "save_directory=", "package=", "partitions=", "nodes=", "web_service="])
except getopt.GetoptError:
print 'The file options for weather_cli.py were not correctly specified.'
print 'To see a full list of options try:'
@@ -56,35 +50,30 @@ def main(argv):
for opt, arg in opts:
if opt == '-h':
print 'Converting weather daily files to xml options:'
- print ' -a (int) The number of partitions for creating split up data progress csv files. Used with -o'
+ print ' -a (int) The number of partitions for creating split up generated data.'
print ' -c Compress the produced XML file with .gz.'
- print ' -d Extra debug information.'
print ' -f (str) The file name of a specific station to process.'
print ' * Helpful when testing a single stations XML file output.'
- print ' -l Reset download.'
+ print ' -l (str) Select the locality of the scripts execution (download, progress_file, sensor_build, station_build, partition, statistics).'
print ' -m (int) Limits the number of files created for each station.'
print ' * Helpful when testing to make sure all elements are supported for each station.'
print ' Alternate form: --max_station_files=(int)'
- print ' -n Do not process data files. (Used to only update the data processing file.)'
- print ' -o (int) The number of nodes for creating split up data progress csv files. Used with -a'
print ' -p (str) The package used to generate files. (all, gsn, hcn)'
- print ' -r Build a new data progress file. (reset)'
- print ' -s (str) The directory for saving the downloaded and created XML files.'
- print ' -t Print the statistics of the data progress file.'
+ print ' -r Reset the build process. (For one section or all sections depending on other parameters.)'
+ print ' -s (str) The directory for saving the downloaded files and generated XML files.'
print ' -u Recalculate the file count and data size for each data source file.'
+ print ' -v Extra debug information.'
print ' -w (str) Downloads the station XML file form the web service.'
sys.exit()
elif opt in ('-a', "--partitions"):
- no_data_processing = True
- partition_stations = True
- append = False
- partitions = int(arg)
+ if arg.isdigit():
+ partitions = int(arg)
+ else:
+ print 'Error: Argument must be an integer for --partitions (-a).'
+ sys.exit()
elif opt == '-c':
global COMPRESSED
COMPRESSED = True
- elif opt == '-d':
- global DEBUG_OUTPUT
- DEBUG_OUTPUT = True
elif opt in ('-f', "--file"):
# check if file exists.
if os.path.exists(arg):
@@ -92,21 +81,18 @@ def main(argv):
else:
print 'Error: Argument must be a file name for --file (-f).'
sys.exit()
- elif opt == '-l':
- download_reset = True
+ elif opt in ('-l', "--locality"):
+ if arg in ("download", "progress_file", "sensor_build", "station_build", "partition", "statistics"):
+ section = arg
+ else:
+ print 'Error: Argument must be a string for --locality (-l) and a valid locality.'
+ sys.exit()
elif opt in ('-m', "--max_station_files"):
if arg.isdigit():
max_records = int(arg)
else:
print 'Error: Argument must be an integer for --max_station_files (-m).'
sys.exit()
- elif opt == '-n':
- no_data_processing = True
- elif opt in ('-o', "--nodes"):
- no_data_processing = True
- partition_stations = True
- append = False
- nodes = int(arg)
elif opt in ('-p', "--package"):
if arg in ("all", "gsn", "hcn"):
package = "ghcnd_" + arg
@@ -122,11 +108,11 @@ def main(argv):
else:
print 'Error: Argument must be a directory for --save_directory (-s).'
sys.exit()
- elif opt == '-t':
- no_data_processing = True
- print_stats = True
elif opt == '-u':
update = True
+ elif opt == '-v':
+ global DEBUG_OUTPUT
+ DEBUG_OUTPUT = True
elif opt == '-w':
# check if file exists.
if arg is not "":
@@ -134,7 +120,6 @@ def main(argv):
else:
print 'Error: Argument must be a string --web_service (-w).'
sys.exit()
- web_service_download = True
# Required fields to run the script.
if save_path == "" or not os.path.exists(save_path):
@@ -143,17 +128,19 @@ def main(argv):
# Set up downloads folder.
download_path = save_path + "/downloads"
- download = WeatherDownloadFiles(download_path)
- download.download_all_files(download_reset)
+ if section in ("all", "download"):
+ print 'Processing the download section.'
+ download = WeatherDownloadFiles(download_path)
+ download.download_all_files(reset)
- # Unzip the required file.
- download.unzip_package(package, download_reset)
+ # Unzip the required file.
+ download.unzip_package(package, reset)
# Create some basic paths for save files and references.
ghcnd_data_dly_path = download_path + '/' + package + '/' + package
- ghcnd_xml_path = save_path + "/1_node_" + package + '_xml/'
- ghcnd_xml_gz_path = save_path + "/1_node_" + package + '_xml_gz/'
+ ghcnd_xml_path = save_path + "/1.0_partition_" + package + '_xml/'
+ ghcnd_xml_gz_path = save_path + "/1.0_partition_" + package + '_xml_gz/'
if COMPRESSED:
xml_data_save_path = ghcnd_xml_gz_path
else:
@@ -168,49 +155,59 @@ def main(argv):
convert = WeatherWebServiceMonthlyXMLFile(download_path, xml_data_save_path, COMPRESSED, DEBUG_OUTPUT)
progress_file = xml_data_save_path + "_data_progress.csv"
data = WeatherDataFiles(ghcnd_data_dly_path, progress_file)
- options = list()
- if append:
+ if section in ("all", "progress_file"):
+ print 'Processing the progress_file section.'
+ options = list()
options.append('append')
- if update:
- options.append('recalculate')
- if reset:
- options.append('reset')
- data.build_progress_file(options, convert)
+ if update:
+ options.append('recalculate')
+ if reset:
+ options.append('reset')
+ data.build_progress_file(options, convert)
- if not no_data_processing:
- if token is not "":
- convert.set_token(token)
+ if section in ("all", "sensor_build"):
+ print 'Processing the sensor_build section.'
if process_file_name is not "":
# process a single file
if os.path.exists(process_file_name):
- (file_count, data_size) = convert.process_file(process_file_name, max_records)
- data.update_file_status(process_file_name, WeatherDataFiles.DATA_FILE_CREATED, file_count, data_size)
+ (file_count, data_size) = convert.process_sensor_file(process_file_name, max_records)
+ data.update_file_sensor_status(process_file_name, WeatherDataFiles.DATA_FILE_GENERATED, file_count, data_size)
else:
- data.update_file_status(process_file_name, WeatherDataFiles.DATA_FILE_MISSING)
+ data.update_file_sensor_status(process_file_name, WeatherDataFiles.DATA_FILE_MISSING)
else:
# process directory
data.reset()
+ data.set_type("sensor")
for file_name in data:
file_path = ghcnd_data_dly_path + '/' + file_name
if os.path.exists(file_path):
- (file_count, data_size) = convert.process_file(file_path, max_records)
- data.update_file_status(file_name, WeatherDataFiles.DATA_FILE_CREATED, file_count, data_size)
+ (file_count, data_size) = convert.process_sensor_file(file_path, max_records)
+ data.update_file_sensor_status(file_name, WeatherDataFiles.DATA_FILE_GENERATED, file_count, data_size)
else:
- data.update_file_status(file_name, WeatherDataFiles.DATA_FILE_MISSING)
+ data.update_file_sensor_status(file_name, WeatherDataFiles.DATA_FILE_MISSING)
- elif web_service_download and token is not "":
- # process directory
- data.reset()
- for file_name in data:
- station_id = file_name[:file_name.index('.')]
- convert.download_station_data(station_id, token)
+ if section in ("all", "station_build"):
+ data.reset()
+ data.set_type("station")
+ if token is not "":
+ convert.set_token(token)
+ for file_name in data:
+ file_path = ghcnd_data_dly_path + '/' + file_name
+ if os.path.exists(file_path):
+ return_status = convert.process_station_file(file_path)
+ status = data.get_station_status(return_status)
+ data.update_file_station_status(file_name, status)
+ else:
+ data.update_file_station_status(file_name, WeatherDataFiles.DATA_FILE_MISSING)
- elif print_stats:
- data.print_progress_file_stats(convert)
- elif partition_stations and nodes > 0 and partitions > 0:
+ if section in ("all", "partition") and partitions > 1:
+ print 'Processing the partition section.'
data.reset()
- data.build_partition_structure(nodes, partitions)
+ data.copy_to_n_partitions(xml_data_save_path, partitions)
+ if section in ("all", "statistics"):
+ print 'Processing the statistics section.'
+ data.print_progress_file_stats(convert)
if __name__ == "__main__":
main(sys.argv[1:])
Modified: incubator/vxquery/trunk/vxquery/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_convert_to_xml.py
URL: http://svn.apache.org/viewvc/incubator/vxquery/trunk/vxquery/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_convert_to_xml.py?rev=1542755&r1=1542754&r2=1542755&view=diff
==============================================================================
--- incubator/vxquery/trunk/vxquery/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_convert_to_xml.py (original)
+++ incubator/vxquery/trunk/vxquery/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_convert_to_xml.py Sun Nov 17 16:16:44 2013
@@ -100,12 +100,17 @@ class WeatherConvertToXML:
return 0
def get_base_folder(self, station_id, data_type="sensors"):
- # Default
- station_prefix = station_id[:3]
- return self.save_path + data_type + "/" + station_prefix + "/" + station_id + "/"
+ return build_base_save_folder(self.save_path, station_id, data_type)
- def process_file(self, file_name, max_files):
- print "Processing file: " + file_name
+ def process_station_file(self, file_name):
+ print "Processing station file: " + file_name
+ file_stream = open(file_name, 'r')
+
+ row = file_stream.readline()
+ return self.process_station_data(row)
+
+ def process_sensor_file(self, file_name, max_files):
+ print "Processing sensor file: " + file_name
file_stream = open(file_name, 'r')
month_last = 0
@@ -188,7 +193,6 @@ class WeatherConvertToXML:
field_xml += " <observation_day>" + str(day) + "</observation_day>\n"
return field_xml
-
def default_xml_field_station_id(self, station_id, indent=2):
field_xml = ""
field_xml += self.get_indent_space(indent) + "<station_id>" + station_id + "</station_id>\n"
@@ -430,9 +434,13 @@ class WeatherWebServiceMonthlyXMLFile(We
# Station data
def process_station_data(self, row):
station_id = self.get_dly_field(row, DLY_FIELD_ID)
-
+ download = 0
if self.token is not "":
- return self.download_station_data(station_id, self.token, True)
+ download = self.download_station_data(station_id, self.token, True)
+
+ # If not downloaded generate.
+ if download != 0:
+ return download
else:
# Information for each daily file.
station_xml_file = self.default_xml_start()
@@ -457,25 +465,34 @@ class WeatherWebServiceMonthlyXMLFile(We
# Station data
def download_station_data(self, station_id, token, reset = False):
-
# Make sure the station folder is available.
ghcnd_xml_station_path = self.get_base_folder(station_id, "stations")
if not os.path.isdir(ghcnd_xml_station_path):
os.makedirs(ghcnd_xml_station_path)
- # Save XML string to disk.
- save_file_name = ghcnd_xml_station_path + station_id + ".xml"
-
# Build download URL.
url = "http://www.ncdc.noaa.gov/cdo-services/services/datasets/GHCND/stations/GHCND:" + station_id + ".xml?token=" + token
-
- # Get station web service file.
- station_xml_file = download_file_save_as(url, save_file_name, reset)
+ url_file = urllib.urlopen(url)
+ station_xml_file = ""
+ while (True):
+ line = url_file.readline()
+ if not line:
+ break
+ station_xml_file += line
+
+ if station_xml_file.find("<cdoError>") != -1:
+ if self.debug_output:
+ print "Error in station download"
+ return 0
+ # Save XML string to disk.
+ save_file_name = ghcnd_xml_station_path + station_id + ".xml"
+ save_file_name = self.save_file(save_file_name, station_xml_file)
+
if save_file_name is not "":
if self.debug_output:
print "Wrote file: " + save_file_name
- return 1
+ return 2
else:
return 0
@@ -533,3 +550,8 @@ class WeatherWebServiceMonthlyXMLFile(We
else:
return 0
+def build_base_save_folder(save_path, station_id, data_type="sensors"):
+ # Default
+ station_prefix = station_id[:3]
+ return save_path + data_type + "/" + station_prefix + "/" + station_id + "/"
+
Modified: incubator/vxquery/trunk/vxquery/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py
URL: http://svn.apache.org/viewvc/incubator/vxquery/trunk/vxquery/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py?rev=1542755&r1=1542754&r2=1542755&view=diff
==============================================================================
--- incubator/vxquery/trunk/vxquery/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py (original)
+++ incubator/vxquery/trunk/vxquery/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py Sun Nov 17 16:16:44 2013
@@ -17,23 +17,30 @@
import glob
import os.path
import linecache
+import distutils.core
+
+from weather_convert_to_xml import *
# Weather data files created to manage the conversion process.
# Allows partition and picking up where you left off.
class WeatherDataFiles:
INDEX_DATA_FILE_NAME = 0
- INDEX_DATA_STATUS = 1
- INDEX_DATA_FILE_COUNT = 2
- INDEX_DATA_FOLDER_DATA = 3
+ INDEX_DATA_SENSORS_STATUS = 1
+ INDEX_DATA_STATION_STATUS = 2
+ INDEX_DATA_FILE_COUNT = 3
+ INDEX_DATA_FOLDER_DATA = 4
DATA_FILE_START_INDEX = 0
DATA_FILE_EXTENSION = ".dly"
DATA_FILE_MISSING = "missing"
DATA_FILE_INITIAL = "initialized"
- DATA_FILE_CREATED = "created"
+ DATA_FILE_DOWNLOADED = "downloaded"
+ DATA_FILE_GENERATED = "generated"
SEPERATOR = ","
+ type = "sensor"
+
def __init__(self, base_path, progress_file_name="/tmp/_weather_data.csv"):
self.base_path = base_path
@@ -62,70 +69,73 @@ class WeatherDataFiles:
self.open_progress_data()
row_count = len(self.progress_data)
for row in range(0, row_count):
- file_name = self.progress_data[row].rsplit(self.SEPERATOR)[self.INDEX_DATA_FILE_NAME]
+ row_contents = self.progress_data[row].rsplit(self.SEPERATOR)
+ file_name = row_contents[self.INDEX_DATA_FILE_NAME]
if self.get_file_row(file_name) < 0 and 'append' in options:
- self.progress_data.append(self.get_progress_csv_row(file_name, self.DATA_FILE_INITIAL))
+ self.progress_data.append(self.get_progress_csv_row(file_name, self.DATA_FILE_INITIAL, self.DATA_FILE_INITIAL))
elif 'recalculate' in options:
# The folder is hard coded
station_id = os.path.basename(file_name).split('.')[0]
folder_name = convert.get_base_folder(station_id)
if os.path.exists(folder_name):
+ row_contents = self.progress_data[row].rsplit(self.SEPERATOR)
+ sensor_status = row_contents[self.INDEX_DATA_SENSORS_STATUS]
+ station_status = row_contents[self.INDEX_DATA_STATION_STATUS]
file_count = self.get_file_count(folder_name)
data_size = self.get_folder_size(folder_name)
- self.progress_data[row] = self.get_progress_csv_row(file_name, self.DATA_FILE_CREATED, file_count, data_size)
+ self.progress_data[row] = self.get_progress_csv_row(file_name, sensor_status, station_status, file_count, data_size)
else:
- self.progress_data[row] = self.get_progress_csv_row(file_name, self.DATA_FILE_INITIAL)
+ self.progress_data[row] = self.get_progress_csv_row(file_name, self.DATA_FILE_INITIAL, self.DATA_FILE_INITIAL)
# Save file
self.close_progress_data(True)
self.reset()
- # Save Functions
- def build_partition_structure(self, nodes, partitions):
+ # Once the initial data has been generated, the data can be copied into a set number of partitions.
+ def copy_to_n_partitions(self, save_path, partitions):
+
+ # Initialize the partition paths.
+ partition_paths = []
+ for i in range(0, partitions):
+ new_partition_path = save_path.replace("1.0_partition", str(partitions) + "." + str(i) + "_partition")
+ partition_paths.append(new_partition_path)
+
+ # Make sure the xml folder is available.
+ if not os.path.isdir(new_partition_path):
+ os.makedirs(new_partition_path)
+
+ # copy stations and sensors into each partition round robin
+ current_partition = 0
+ csv_sorted = self.get_csv_in_partition_order()
+ for item in csv_sorted:
+ file_name = item.rsplit(self.SEPERATOR)[self.INDEX_DATA_FILE_NAME]
+ station_id = os.path.basename(file_name).split('.')[0]
+
+ # Copy station files
+ for type in ("sensors", "stations"):
+ file_path = build_base_save_folder(save_path, station_id, type)
+ new_file_path = build_base_save_folder(partition_paths[current_partition], station_id, type)
+ distutils.dir_util.copy_tree(file_path, new_file_path)
+
+ # Update partition
+ current_partition += 1
+ if current_partition >= partitions:
+ current_partition = 0
+
+ def get_csv_in_partition_order(self):
self.open_progress_data()
row_count = len(self.progress_data)
# Get the dictionary of all the files and data sizes.
csv_dict = dict()
for row in range(0, row_count):
- file_name = self.progress_data[row].rsplit(self.SEPERATOR)[self.INDEX_DATA_FILE_NAME]
- folder_data = int(self.progress_data[row].rsplit(self.SEPERATOR)[self.INDEX_DATA_FOLDER_DATA])
+ row_contents = self.progress_data[row].rsplit(self.SEPERATOR)
+ file_name = row_contents[self.INDEX_DATA_FILE_NAME]
+ folder_data = int(row_contents[self.INDEX_DATA_FOLDER_DATA])
csv_dict[file_name] = folder_data
# New sorted list.
- csv_sorted = sorted(csv_dict, key=csv_dict.get, reverse=True)
-
- # Initialize the partition variables.
- total_partitions = nodes * partitions
- current_partition = 0
- list_of_partitions = []
- for i in range(0, total_partitions):
- list_of_partitions.append(set())
-
- # Add the files in a round robin order.
- for item in csv_sorted:
- list_of_partitions[current_partition].add(item)
- current_partition += 1
- if current_partition >= total_partitions:
- current_partition = 0
-
- # Save list of files for each node's partitions.
- for i in range(0, nodes):
- for j in range(0, partitions):
- current_partition = (i * partitions) + j
- self.write_partition_file(i + 1, j + 1, list_of_partitions[current_partition])
-
-
- # Write out the partition file list to a CSV file.
- def write_partition_file(self, node, partition, items):
- save_partition_file = "node_" + str(node) + "_level_" + str(partition) + ".csv"
- file = open(save_partition_file, 'w')
- contents = ""
- for file_name in items:
- contents += self.get_progress_csv_row(file_name, self.DATA_FILE_INITIAL)
- file.write(contents)
- file.close()
-
+ return sorted(csv_dict, key=csv_dict.get, reverse=True)
def get_file_row(self, file_name):
for i in range(0, len(self.progress_data)):
@@ -137,62 +147,92 @@ class WeatherDataFiles:
contents = ""
for path in self.get_file_list_iterator():
file_name = os.path.basename(path)
- contents += self.get_progress_csv_row(file_name, self.DATA_FILE_INITIAL)
+ contents += self.get_progress_csv_row(file_name, self.DATA_FILE_INITIAL, self.DATA_FILE_INITIAL)
return contents
def print_progress_file_stats(self, convert):
- station_count_missing = 0
- station_count = 0
+ sensor_count_missing = 0
+ sensor_count = 0
file_count = 0
data_size = 0
- station_count_actual = 0
+ sensor_count_actual = 0
file_count_actual = 0
data_size_actual = 0
+ station_count_missing = 0
+ station_count_generated = 0
+ station_count_downloaded = 0
+
self.open_progress_data()
row_count = len(self.progress_data)
for row in range(0, row_count):
row_contents = self.progress_data[row].rsplit(self.SEPERATOR)
if int(row_contents[self.INDEX_DATA_FILE_COUNT]) != -1 and int(row_contents[self.INDEX_DATA_FOLDER_DATA]) != -1:
- station_count += 1
+ sensor_count += 1
file_count += int(row_contents[self.INDEX_DATA_FILE_COUNT])
data_size += int(row_contents[self.INDEX_DATA_FOLDER_DATA])
else:
- station_count_missing += 1
+ sensor_count_missing += 1
+ if row_contents[self.INDEX_DATA_STATION_STATUS] == "generated":
+ station_count_generated += 1
+ if row_contents[self.INDEX_DATA_STATION_STATUS] == "downloaded":
+ station_count_downloaded += 1
+ else:
+ station_count_missing += 1
+
file_name = row_contents[self.INDEX_DATA_FILE_NAME]
station_id = os.path.basename(file_name).split('.')[0]
folder_name = convert.get_base_folder(station_id)
if os.path.exists(folder_name):
- station_count_actual += 1
+ sensor_count_actual += 1
file_count_actual += self.get_file_count(folder_name)
data_size_actual += self.get_folder_size(folder_name)
print "Progress File:\t" + self.progress_file_name + "\n"
- print "CSV DETAILS OF PROCESSED STATIONS"
- print "Number of stations:\t" + "{:,}".format(station_count)
+ print "CSV DETAILS OF PROCESSED SENSORS"
+ print "Number of stations:\t" + "{:,}".format(sensor_count)
print "Number of files:\t" + "{:,}".format(file_count)
print "Data size:\t\t" + "{:,}".format(data_size) + " Bytes\n"
- print "CSV DETAILS OF unPROCESSED STATIONS"
- print "Number of stations:\t" + "{:,}".format(station_count_missing) + "\n"
+ print "CSV DETAILS OF unPROCESSED SENSORS"
+ print "Number of stations:\t" + "{:,}".format(sensor_count_missing) + "\n"
+
+ print "CSV DETAILS OF PROCESSED STATIONS"
+ print "Generated:\t\t" + "{:,}".format(station_count_generated)
+ print "Downloaded:\t\t" + "{:,}".format(station_count_downloaded)
+ print "Missing:\t\t" + "{:,}".format(station_count_missing) + "\n"
print "FOLDER DETAILS"
- print "Number of stations:\t" + "{:,}".format(station_count_actual)
+ print "Number of stations:\t" + "{:,}".format(sensor_count_actual)
print "Number of files:\t" + "{:,}".format(file_count_actual)
print "Data size:\t\t" + "{:,}".format(data_size_actual) + " Bytes\n"
- def get_progress_csv_row(self, file_name, status, file_count=-1, data_size=-1):
- return file_name + self.SEPERATOR + status + self.SEPERATOR + str(file_count) + self.SEPERATOR + str(data_size) + "\n"
+ def get_progress_csv_row(self, file_name, sensors_status, station_status, file_count=-1, data_size=-1):
+ return file_name + self.SEPERATOR + sensors_status + self.SEPERATOR + station_status + self.SEPERATOR + str(file_count) + self.SEPERATOR + str(data_size) + "\n"
- def update_file_status(self, file_name, status, file_count=-1, data_size=-1):
- for i in range(0, len(self.progress_data)):
- if self.progress_data[i].startswith(file_name):
- self.progress_data[i] = self.get_progress_csv_row(file_name, status, file_count, data_size)
+ def update_file_sensor_status(self, file_name, sensors_status, file_count=-1, data_size=-1):
+ for row in range(0, len(self.progress_data)):
+ if self.progress_data[row].startswith(file_name):
+ station_status = self.progress_data[row].rsplit(self.SEPERATOR)[self.INDEX_DATA_STATION_STATUS]
+ self.progress_data[row] = self.get_progress_csv_row(file_name, sensors_status, station_status, file_count, data_size)
+ break
+
+ # Save the file
+ self.close_progress_data(True)
+
+ def update_file_station_status(self, file_name, station_status):
+ for row in range(0, len(self.progress_data)):
+ if self.progress_data[row].startswith(file_name):
+ row_contents = self.progress_data[row].rsplit(self.SEPERATOR)
+ sensors_status = row_contents[self.INDEX_DATA_SENSORS_STATUS]
+ file_count = int(row_contents[self.INDEX_DATA_FILE_COUNT])
+ data_size = int(row_contents[self.INDEX_DATA_FOLDER_DATA])
+ self.progress_data[row] = self.get_progress_csv_row(file_name, sensors_status, station_status, file_count, data_size)
break
# Save the file
@@ -213,6 +253,13 @@ class WeatherDataFiles:
total_size += os.path.getsize(fp)
return total_size
+ def get_station_status(self, return_value):
+ if return_value == 2:
+ return self.DATA_FILE_DOWNLOADED
+ elif return_value == 1:
+ return self.DATA_FILE_GENERATED
+ return self.DATA_FILE_MISSING
+
def open_progress_data(self):
with open(self.progress_file_name, 'r') as file:
@@ -230,6 +277,9 @@ class WeatherDataFiles:
self.current = self.DATA_FILE_START_INDEX
self.open_progress_data()
+ def set_type(self, type):
+ self.type = type
+
# Iterator Functions
def __iter__(self):
@@ -244,6 +294,8 @@ class WeatherDataFiles:
row = self.progress_data[self.current]
self.current += 1
columns = row.rsplit(self.SEPERATOR)
- if columns[self.INDEX_DATA_STATUS].strip() != self.DATA_FILE_CREATED:
+ if self.type == "sensor" and columns[self.INDEX_DATA_SENSORS_STATUS].strip() != self.DATA_FILE_GENERATED:
+ break
+ elif self.type == "station" and columns[self.INDEX_DATA_STATION_STATUS].strip() != self.DATA_FILE_DOWNLOADED:
break
return columns[self.INDEX_DATA_FILE_NAME]