You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@vxquery.apache.org by pr...@apache.org on 2014/04/02 06:11:56 UTC
[16/50] [abbrv] git commit: Benchmark scripts simplification and
update.
Benchmark scripts simplification and update.
* Removed compression.
* Added a print out of the partition scheme.
* Added new queries to the list.
Project: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/commit/491e5917
Tree: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/tree/491e5917
Diff: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/diff/491e5917
Branch: refs/heads/prestonc/hash_join
Commit: 491e5917847c4d0dd9041bcf71b8b6712faab067
Parents: 73875c6
Author: Preston Carman <pr...@apache.org>
Authored: Thu Mar 13 23:15:10 2014 -0700
Committer: Preston Carman <pr...@apache.org>
Committed: Tue Apr 1 20:56:24 2014 -0700
----------------------------------------------------------------------
.../scripts/weather_benchmark.py | 105 ++++++++++++++-----
.../noaa-ghcn-daily/scripts/weather_cli.py | 21 ++--
.../scripts/weather_convert_to_xml.py | 11 +-
.../scripts/weather_data_files.py | 17 ++-
4 files changed, 108 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/491e5917/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py
----------------------------------------------------------------------
diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py
index 66f85d6..0500de3 100644
--- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py
+++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py
@@ -34,7 +34,8 @@ class WeatherBenchmark:
QUERY_REPLACEMENT_KEY = "/tmp/1.0_partition_ghcnd_all_xml/"
QUERY_MASTER_FOLDER = "../queries/"
- QUERY_FILE_LIST = ["q00.xq", "q01.xq", "q02.xq", "q03.xq", "q04.xq", "q05.xq"]
+ QUERY_FILE_LIST = ["q00.xq", "q01.xq", "q02.xq", "q03.xq", "q04.xq", "q05.xq", "q06.xq"]
+ QUERY_UTILITY_LIST = ["sensor_count.xq", "station_count.xq", "q04_sensor.xq", "q04_station.xq", "q05_sensor.xq", "q05_station.xq", "q06_sensor.xq"]
BENCHMARK_LOCAL_TESTS = ["local_speed_up", "local_batch_scale_out"]
BENCHMARK_CLUSTER_TESTS = ["speed_up", "batch_scale_out"]
QUERY_COLLECTIONS = ["sensors", "stations"]
@@ -47,40 +48,73 @@ class WeatherBenchmark:
self.dataset = dataset
self.nodes = nodes
- def build_data_links(self, xml_save_path):
+ def print_partition_scheme(self, xml_save_path):
if (len(self.base_paths) == 0):
return
for test in self.dataset.get_tests():
if test in self.BENCHMARK_LOCAL_TESTS:
- self.build_local_data_links(test, xml_save_path)
+ self.print_local_partition_schemes(test, xml_save_path)
elif test in self.BENCHMARK_CLUSTER_TESTS:
- self.build_cluster_data_links(test, xml_save_path)
+ self.print_cluster_partition_schemes(test, xml_save_path)
else:
print "Unknown test."
exit()
- def build_local_data_links(self, test, xml_save_path):
- virtual_partitions = get_local_virtual_partitions(self.partitions)
- data_paths = get_partition_paths(virtual_partitions, self.base_paths)
+ def print_local_partition_schemes(self, test, xml_save_path):
for i in self.partitions:
- link_base_paths = get_partition_paths(i, self.base_paths, "data_links/" + test)
- # Match link paths to real data paths.
- offset = 0
- group_size = len(data_paths) / len(link_base_paths)
- for link_index, link_path in enumerate(link_base_paths):
- if os.path.isdir(link_path):
- shutil.rmtree(link_path)
- for data_index, data_path in enumerate(data_paths):
- if offset <= data_index and data_index < offset + group_size:
- self.add_collection_links_for(data_path, link_path, data_index)
- offset += group_size
+ scheme = self.get_local_partition_scheme(test, xml_save_path, i)
+ virtual_partitions = get_local_virtual_partitions(self.partitions)
+ self.print_partition_schemes(virtual_partitions, scheme, test, i)
+
+ def print_cluster_partition_schemes(self, test, xml_save_path):
+ scheme = self.get_cluster_partition_scheme(test, xml_save_path)
+ virtual_partitions = get_cluster_virtual_partitions(self.nodes, self.partitions)
+ self.print_partition_schemes(virtual_partitions, scheme, test, 0)
+
+ def print_partition_schemes(self, virtual_partitions, scheme, test, partitions):
+ print
+ print "---------------- Partition Scheme --------------------"
+ print " Test: " + test
+ print " Virtual Partitions: " + str(virtual_partitions)
+ print " Disks: " + str(len(self.base_paths))
+ print " Partitions: " + str(partitions)
+
+ if len(scheme) > 0:
+ folder_length = len(scheme[0][3]) + 5
+ row_format = "{:>5} {:>5} {:<" + str(folder_length) + "} {:<" + str(folder_length) + "}"
+ HEADER = ("Index", "Link", "Data Path", "Link Path")
+ print row_format.format(*HEADER)
+ for row in scheme:
+ print row_format.format(*row)
+ print
+ else:
+ print " Scheme is EMPTY."
+
+ def get_local_partition_scheme(self, test, xml_save_path, partition):
+ scheme = []
+ virtual_partitions = get_local_virtual_partitions(self.partitions)
+ data_schems = get_partition_scheme(virtual_partitions, self.base_paths)
+
+ link_base_schemes = get_partition_scheme(partition, self.base_paths, "data_links/" + test)
+ # Match link paths to real data paths.
+ offset = 0
+ group_size = len(data_schemes) / len(link_base_schemes)
+ for link_disk, link_virtual, link_index, link_path in enumerate(link_base_schemes):
+ for data_disk, data_virtual, data_index, data_path in enumerate(data_schemes):
+ if test == "local_speed_up" and offset <= data_index and data_index < offset + group_size:
+ scheme.append([data_index, link_index, data_path, link_path])
+ elif test == "local_batch_scale_out" and data_index == link_index:
+ scheme.append([data_index, link_index, data_path, link_path])
+ offset += group_size
+ return scheme
- def build_cluster_data_links(self, test, xml_save_path):
+ def get_cluster_partition_scheme(self, test, xml_save_path):
node_index = self.get_current_node_index()
if node_index == -1:
print "Unknown host."
return
+ scheme = []
virtual_partitions = get_cluster_virtual_partitions(self.nodes, self.partitions)
data_paths = get_partition_paths(virtual_partitions, self.base_paths)
link_base_paths = get_cluster_link_paths(len(self.nodes), self.base_paths, "data_links/" + test)
@@ -88,8 +122,6 @@ class WeatherBenchmark:
# Match link paths to real data paths.
link_base_paths.sort()
for link_index, link_path in enumerate(link_base_paths):
- if os.path.isdir(link_path):
- shutil.rmtree(link_path)
# Prep
link_offset = link_index % len(self.nodes)
disk_offset = link_index // len(self.nodes)
@@ -111,8 +143,33 @@ class WeatherBenchmark:
data_paths.sort()
for data_index, data_path in enumerate(data_paths):
if has_data and node_offset <= data_index and data_index < node_offset + group_size:
- self.add_collection_links_for(data_path, link_path, data_index)
- self.add_collection_links_for("", link_path, -1)
+ scheme.append([data_index, link_index, data_path, link_path])
+ scheme.append([-1, link_index, "", link_path])
+ return scheme
+
+ def build_data_links(self, xml_save_path):
+ if (len(self.base_paths) == 0):
+ return
+ for test in self.dataset.get_tests():
+ if test in self.BENCHMARK_LOCAL_TESTS:
+ for i in self.partitions:
+ scheme = self.get_local_partition_scheme(test, xml_save_path, i)
+ self.build_data_links_scheme(scheme)
+ elif test in self.BENCHMARK_CLUSTER_TESTS:
+ scheme = self.get_cluster_partition_scheme(test, xml_save_path)
+ self.build_data_links_scheme(scheme)
+ else:
+ print "Unknown test."
+ exit()
+
+ def build_data_links_scheme(self, scheme):
+ """Build all the data links based on the scheme information."""
+ link_path_cleared = []
+ for (data_index, partition, data_path, link_path) in scheme:
+ if link_path not in link_path_cleared and os.path.isdir(link_path):
+ shutil.rmtree(link_path)
+ link_path_cleared.append(link_path)
+ self.add_collection_links_for(data_path, link_path, data_index)
def get_current_node_index(self):
found = False
@@ -172,7 +229,7 @@ class WeatherBenchmark:
def copy_and_replace_query(self, query_path, replacement_list):
'''Copy the query files over to the query_path and replace the path
for the where the collection data is located.'''
- for query_file in self.QUERY_FILE_LIST:
+ for query_file in self.QUERY_FILE_LIST + self.QUERY_UTILITY_LIST:
shutil.copyfile(self.QUERY_MASTER_FOLDER + query_file, query_path + query_file)
# Make a search replace for each collection.
http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/491e5917/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py
----------------------------------------------------------------------
diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py
index a1a1aa2..52945e5 100644
--- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py
+++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py
@@ -24,7 +24,6 @@ from weather_config import *
from weather_benchmark import *
DEBUG_OUTPUT = False
-COMPRESSED = False
#
# Weather conversion for GHCN-DAILY files to xml.
@@ -42,7 +41,7 @@ def main(argv):
xml_config_path = ""
try:
- opts, args = getopt.getopt(argv, "acf:hl:m:ruvw:x:", ["file=", "locality=", "max_station_files=", "web_service=", "xml_config="])
+ opts, args = getopt.getopt(argv, "af:hl:m:ruvw:x:", ["file=", "locality=", "max_station_files=", "web_service=", "xml_config="])
except getopt.GetoptError:
print 'The file options for weather_cli.py were not correctly specified.'
print 'To see a full list of options try:'
@@ -52,10 +51,9 @@ def main(argv):
if opt == '-h':
print 'Converting weather daily files to xml options:'
print ' -a Append the results to the progress file.'
- print ' -c Compress the produced XML file with .gz.'
print ' -f (str) The file name of a specific station to process.'
print ' * Helpful when testing a single stations XML file output.'
- print ' -l (str) Select the locality of the scripts execution (download, progress_file, sensor_build, station_build, partition, statistics).'
+ print ' -l (str) Select the locality of the scripts execution (download, progress_file, sensor_build, station_build, partition, partition_scheme, statistics).'
print ' -m (int) Limits the number of files created for each station.'
print ' * Helpful when testing to make sure all elements are supported for each station.'
print ' Alternate form: --max_station_files=(int)'
@@ -67,9 +65,6 @@ def main(argv):
sys.exit()
elif opt in ('-a', "--append"):
append = True
- elif opt == '-c':
- global COMPRESSED
- COMPRESSED = True
elif opt in ('-f', "--file"):
# check if file exists.
if os.path.exists(arg):
@@ -78,7 +73,7 @@ def main(argv):
print 'Error: Argument must be a file name for --file (-f).'
sys.exit()
elif opt in ('-l', "--locality"):
- if arg in ("download", "progress_file", "sensor_build", "station_build", "partition", "test_links", "queries", "statistics"):
+ if arg in ("download", "progress_file", "sensor_build", "station_build", "partition", "partition_scheme", "test_links", "queries", "statistics"):
section = arg
else:
print 'Error: Argument must be a string for --locality (-l) and a valid locality.'
@@ -144,7 +139,7 @@ def main(argv):
os.makedirs(xml_data_save_path)
# Set up the XML build objects.
- convert = WeatherWebServiceMonthlyXMLFile(download_path, xml_data_save_path, COMPRESSED, DEBUG_OUTPUT)
+ convert = WeatherWebServiceMonthlyXMLFile(download_path, xml_data_save_path, DEBUG_OUTPUT)
progress_file = xml_data_save_path + "_data_progress.csv"
data = WeatherDataFiles(ghcnd_data_dly_path, progress_file)
if section in ("all", "progress_file"):
@@ -207,15 +202,19 @@ def main(argv):
base_paths.append(paths + dataset_folder + "/")
benchmark = WeatherBenchmark(base_paths, dataset.get_partitions(), dataset, config.get_node_machine_list())
- if section in ("all", "partition"):
+ if section in ("all", "partition", "partition_scheme"):
slices = benchmark.get_number_of_slices()
print 'Processing the partition section (' + dataset.get_name() + ':d' + str(len(base_paths)) + ':s' + str(slices) + ').'
data.reset()
- data.copy_to_n_partitions(xml_data_save_path, slices, base_paths, reset)
+ if section == "partition_scheme":
+ benchmark.print_partition_scheme(xml_data_save_path)
+ else:
+ data.copy_to_n_partitions(xml_data_save_path, slices, base_paths, reset)
if section in ("all", "test_links"):
# TODO determine current node
print 'Processing the test links section (' + dataset.get_name() + ').'
+ benchmark.print_partition_scheme(xml_data_save_path)
benchmark.build_data_links(xml_data_save_path)
if section in ("all", "queries"):
http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/491e5917/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_convert_to_xml.py
----------------------------------------------------------------------
diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_convert_to_xml.py b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_convert_to_xml.py
index a7adce5..c115efa 100644
--- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_convert_to_xml.py
+++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_convert_to_xml.py
@@ -17,7 +17,6 @@
import textwrap
from datetime import date
import os
-import gzip
from collections import OrderedDict
# Custom modules.
@@ -104,9 +103,8 @@ class WeatherConvertToXML:
token = ""
- def __init__(self, base_path, save_path, compressed, debug_output):
+ def __init__(self, base_path, save_path, debug_output):
self.save_path = save_path
- self.compressed = compressed
self.debug_output = debug_output
# Extra support files.
@@ -134,14 +132,9 @@ class WeatherConvertToXML:
print str(field[FIELD_INDEX_NAME]) + " = '" + row[(field[FIELD_INDEX_START] - 1):field[FIELD_INDEX_END]] + "'"
def save_file(self, filename, contents):
- if self.compressed:
- filename = filename + '.gz'
- file = gzip.open(filename, 'wb')
- else:
- file = open(filename, 'w')
+ file = open(filename, 'w')
file.write(contents)
file.close()
-
return filename
def get_folder_size(self, folder_name):
http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/491e5917/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py
----------------------------------------------------------------------
diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py
index a9b4ecc..a68cba1 100644
--- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py
+++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py
@@ -318,13 +318,26 @@ class WeatherDataFiles:
return columns[self.INDEX_DATA_FILE_NAME]
+# Index values of each field details.
+PARTITION_INDEX_DISK = 0
+PARTITION_INDEX_VIRTUAL = 1
+PARTITION_INDEX = 2
+PARTITION_INDEX_PATH = 3
+PARTITION_HEADER = ("Disk", "Virtual", "Index", "Path")
+
def get_partition_paths(partitions, base_paths, key="partitions"):
partition_paths = []
+ for scheme in get_partition_scheme(partitions, base_paths, key):
+ partition_paths.append(scheme[PARTITION_INDEX_PATH])
+ return partition_paths
+
+def get_partition_scheme(partitions, base_paths, key="partitions"):
+ partition_scheme = []
for i in range(0, partitions):
for j in range(0, len(base_paths)):
new_partition_path = base_paths[j] + key + "/" + get_partition_folder(j, partitions, i) + "/"
- partition_paths.append(new_partition_path)
- return partition_paths
+ partition_scheme.append((j, partitions, i, new_partition_path))
+ return partition_scheme
def get_partition_folder(disks, partitions, index):
return "d" + str(disks) + "_p" + str(partitions) + "_i" + str(index)