You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@vxquery.apache.org by pr...@apache.org on 2014/03/12 21:14:45 UTC
[2/5] git commit: Added a reset option to partitions and queries.
Added a reset option to partitions and queries.
The new option allows for all existing data to be removed before running the file copies. Makes a clean start to the data.
Project: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/commit/2bc25a10
Tree: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/tree/2bc25a10
Diff: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/diff/2bc25a10
Branch: refs/heads/prestonc/benchmarks_staging
Commit: 2bc25a10517d80c2d9277b2be8ab73e6557ad1c2
Parents: 9456d4a
Author: Preston Carman <pr...@apache.org>
Authored: Thu Mar 6 20:04:52 2014 -0800
Committer: Preston Carman <pr...@apache.org>
Committed: Thu Mar 6 20:04:52 2014 -0800
----------------------------------------------------------------------
.../scripts/weather_benchmark.py | 23 +++++++++-----------
.../noaa-ghcn-daily/scripts/weather_cli.py | 4 ++--
.../scripts/weather_convert_to_xml.py | 2 +-
.../scripts/weather_data_files.py | 14 ++++++++----
4 files changed, 23 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/2bc25a10/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py
----------------------------------------------------------------------
diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py
index 8a032c2..66f85d6 100644
--- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py
+++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py
@@ -68,7 +68,9 @@ class WeatherBenchmark:
offset = 0
group_size = len(data_paths) / len(link_base_paths)
for link_index, link_path in enumerate(link_base_paths):
- for data_index, data_path in enumerate(data_paths):
+ if os.path.isdir(link_path):
+ shutil.rmtree(link_path)
+ for data_index, data_path in enumerate(data_paths):
if offset <= data_index and data_index < offset + group_size:
self.add_collection_links_for(data_path, link_path, data_index)
offset += group_size
@@ -134,38 +136,34 @@ class WeatherBenchmark:
if index >= 0:
os.symlink(real_path + collection + "/", collection_path + "index" + str(index))
- def copy_query_files(self):
+ def copy_query_files(self, reset):
for test in self.dataset.get_tests():
if test in self.BENCHMARK_LOCAL_TESTS:
- self.copy_local_query_files(test)
+ self.copy_local_query_files(test, reset)
elif test in self.BENCHMARK_CLUSTER_TESTS:
- self.copy_cluster_query_files(test)
+ self.copy_cluster_query_files(test, reset)
else:
print "Unknown test."
exit()
- def copy_cluster_query_files(self, test):
+ def copy_cluster_query_files(self, test, reset):
'''Determine the data_link path for cluster query files and copy with
new location for collection.'''
partitions = self.dataset.get_partitions()[0]
for i in range(len(self.nodes)):
query_path = get_cluster_query_path(self.base_paths, test, i)
-
- if not os.path.isdir(query_path):
- os.makedirs(query_path)
+ prepare_path(query_path, reset)
# Copy query files.
partition_paths = get_cluster_link_paths_for_node(i, self.base_paths, "data_links/" + test)
self.copy_and_replace_query(query_path, partition_paths)
- def copy_local_query_files(self, test):
+ def copy_local_query_files(self, test, reset):
'''Determine the data_link path for local query files and copy with
new location for collection.'''
for i in self.partitions:
query_path = get_local_query_path(self.base_paths, test, i)
-
- if not os.path.isdir(query_path):
- os.makedirs(query_path)
+ prepare_path(query_path, reset)
# Copy query files.
partition_paths = get_partition_paths(i, self.base_paths, "data_links/" + test)
@@ -200,7 +198,6 @@ class WeatherBenchmark:
print "Unknown test."
exit()
-
def get_cluster_link_paths(nodes, base_paths, key="partitions"):
link_paths = []
for i in range(0, nodes):
http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/2bc25a10/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py
----------------------------------------------------------------------
diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py
index 8d18607..a1a1aa2 100644
--- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py
+++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py
@@ -211,7 +211,7 @@ def main(argv):
slices = benchmark.get_number_of_slices()
print 'Processing the partition section (' + dataset.get_name() + ':d' + str(len(base_paths)) + ':s' + str(slices) + ').'
data.reset()
- data.copy_to_n_partitions(xml_data_save_path, slices, base_paths)
+ data.copy_to_n_partitions(xml_data_save_path, slices, base_paths, reset)
if section in ("all", "test_links"):
# TODO determine current node
@@ -220,7 +220,7 @@ def main(argv):
if section in ("all", "queries"):
print 'Processing the queries section (' + dataset.get_name() + ').'
- benchmark.copy_query_files()
+ benchmark.copy_query_files(reset)
# if section in ("statistics"):
# print 'Processing the statistics section.'
http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/2bc25a10/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_convert_to_xml.py
----------------------------------------------------------------------
diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_convert_to_xml.py b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_convert_to_xml.py
index 1aee4a7..a7adce5 100644
--- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_convert_to_xml.py
+++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_convert_to_xml.py
@@ -376,7 +376,7 @@ class WeatherConvertToXML:
xml_station += self.default_xml_element("elevation", elevation, 2)
state_code = self.get_field_from_definition(station_ghcnd_row, STATIONS_FIELDS['STATE']).strip()
- if state_code != "":
+ if state_code != "" and state_code in self.STATES:
xml_station += self.default_xml_location_labels("ST", "FIPS:" + str(self.STATES.keys().index(state_code)), self.STATES[state_code])
# Add the MSHR data to the station generated information.
http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/2bc25a10/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py
----------------------------------------------------------------------
diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py
index da2afcc..a9b4ecc 100644
--- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py
+++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py
@@ -92,7 +92,7 @@ class WeatherDataFiles:
self.close_progress_data(True)
self.reset()
- def copy_to_n_partitions(self, save_path, partitions, base_paths=[]):
+ def copy_to_n_partitions(self, save_path, partitions, base_paths, reset):
"""Once the initial data has been generated, the data can be copied into a set number of partitions. """
if (len(base_paths) == 0):
return
@@ -103,8 +103,7 @@ class WeatherDataFiles:
for path in partition_paths:
partition_sizes.append(0)
# Make sure the xml folder is available.
- if not os.path.isdir(path):
- os.makedirs(path)
+ prepare_path(path, reset)
# copy stations and sensors into each partition
current_partition = 0
@@ -125,7 +124,6 @@ class WeatherDataFiles:
if os.path.isdir(file_path):
distutils.dir_util.copy_tree(file_path, new_file_path)
partition_sizes[current_partition] += size
-
# Copy station files
type = "stations"
@@ -319,6 +317,7 @@ class WeatherDataFiles:
break
return columns[self.INDEX_DATA_FILE_NAME]
+
def get_partition_paths(partitions, base_paths, key="partitions"):
partition_paths = []
for i in range(0, partitions):
@@ -330,4 +329,11 @@ def get_partition_paths(partitions, base_paths, key="partitions"):
def get_partition_folder(disks, partitions, index):
return "d" + str(disks) + "_p" + str(partitions) + "_i" + str(index)
+def prepare_path(path, reset):
+ """Ensures the directory is available. If reset, then its a brand new directory."""
+ if os.path.isdir(path) and reset:
+ shutil.rmtree(path)
+
+ if not os.path.isdir(path):
+ os.makedirs(path)