You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@vxquery.apache.org by pr...@apache.org on 2014/04/02 06:11:56 UTC

[16/50] [abbrv] git commit: Benchmark scripts simplification and update.

Benchmark scripts simplification and update.

* Removed compression.
* Added a print out of the partition scheme.
* Added new queries to the list.


Project: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/commit/491e5917
Tree: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/tree/491e5917
Diff: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/diff/491e5917

Branch: refs/heads/prestonc/hash_join
Commit: 491e5917847c4d0dd9041bcf71b8b6712faab067
Parents: 73875c6
Author: Preston Carman <pr...@apache.org>
Authored: Thu Mar 13 23:15:10 2014 -0700
Committer: Preston Carman <pr...@apache.org>
Committed: Tue Apr 1 20:56:24 2014 -0700

----------------------------------------------------------------------
 .../scripts/weather_benchmark.py                | 105 ++++++++++++++-----
 .../noaa-ghcn-daily/scripts/weather_cli.py      |  21 ++--
 .../scripts/weather_convert_to_xml.py           |  11 +-
 .../scripts/weather_data_files.py               |  17 ++-
 4 files changed, 108 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/491e5917/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py
----------------------------------------------------------------------
diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py
index 66f85d6..0500de3 100644
--- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py
+++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py
@@ -34,7 +34,8 @@ class WeatherBenchmark:
 
     QUERY_REPLACEMENT_KEY = "/tmp/1.0_partition_ghcnd_all_xml/"
     QUERY_MASTER_FOLDER = "../queries/"
-    QUERY_FILE_LIST = ["q00.xq", "q01.xq", "q02.xq", "q03.xq", "q04.xq", "q05.xq"] 
+    QUERY_FILE_LIST = ["q00.xq", "q01.xq", "q02.xq", "q03.xq", "q04.xq", "q05.xq", "q06.xq"] 
+    QUERY_UTILITY_LIST = ["sensor_count.xq", "station_count.xq", "q04_sensor.xq", "q04_station.xq", "q05_sensor.xq", "q05_station.xq", "q06_sensor.xq"] 
     BENCHMARK_LOCAL_TESTS = ["local_speed_up", "local_batch_scale_out"] 
     BENCHMARK_CLUSTER_TESTS = ["speed_up", "batch_scale_out"] 
     QUERY_COLLECTIONS = ["sensors", "stations"]
@@ -47,40 +48,73 @@ class WeatherBenchmark:
         self.dataset = dataset
         self.nodes = nodes
         
-    def build_data_links(self, xml_save_path):
+    def print_partition_scheme(self, xml_save_path):
         if (len(self.base_paths) == 0):
             return
         for test in self.dataset.get_tests():
             if test in self.BENCHMARK_LOCAL_TESTS:
-                self.build_local_data_links(test, xml_save_path)
+                self.print_local_partition_schemes(test, xml_save_path)
             elif test in self.BENCHMARK_CLUSTER_TESTS:
-                self.build_cluster_data_links(test, xml_save_path)
+                self.print_cluster_partition_schemes(test, xml_save_path)
             else:
                 print "Unknown test."
                 exit()
             
-    def build_local_data_links(self, test, xml_save_path):
-        virtual_partitions = get_local_virtual_partitions(self.partitions)
-        data_paths = get_partition_paths(virtual_partitions, self.base_paths)
+    def print_local_partition_schemes(self, test, xml_save_path):
         for i in self.partitions:
-            link_base_paths = get_partition_paths(i, self.base_paths, "data_links/" + test)
-            # Match link paths to real data paths.
-            offset = 0
-            group_size = len(data_paths) / len(link_base_paths)
-            for link_index, link_path in enumerate(link_base_paths):
-                if os.path.isdir(link_path):
-                    shutil.rmtree(link_path)
-                for data_index, data_path in enumerate(data_paths):
-                    if offset <= data_index and data_index < offset + group_size:
-                        self.add_collection_links_for(data_path, link_path, data_index)
-                offset += group_size
+            scheme = self.get_local_partition_scheme(test, xml_save_path, i)
+            virtual_partitions = get_local_virtual_partitions(self.partitions)
+            self.print_partition_schemes(virtual_partitions, scheme, test, i)
+        
+    def print_cluster_partition_schemes(self, test, xml_save_path):
+        scheme = self.get_cluster_partition_scheme(test, xml_save_path)
+        virtual_partitions = get_cluster_virtual_partitions(self.nodes, self.partitions)
+        self.print_partition_schemes(virtual_partitions, scheme, test, 0)
+        
+    def print_partition_schemes(self, virtual_partitions, scheme, test, partitions):
+        print
+        print "---------------- Partition Scheme --------------------"
+        print "    Test: " + test
+        print "    Virtual Partitions: " + str(virtual_partitions)
+        print "    Disks: " + str(len(self.base_paths))
+        print "    Partitions: " + str(partitions)
+        
+        if len(scheme) > 0:
+            folder_length = len(scheme[0][3]) + 5
+            row_format = "{:>5} {:>5} {:<" + str(folder_length) + "} {:<" + str(folder_length) + "}"
+            HEADER = ("Index", "Link", "Data Path", "Link Path")
+            print row_format.format(*HEADER)
+            for row in scheme:
+                print row_format.format(*row)
+            print
+        else:
+            print "    Scheme is EMPTY."
+
+    def get_local_partition_scheme(self, test, xml_save_path, partition):
+        scheme = []
+        virtual_partitions = get_local_virtual_partitions(self.partitions)
+        data_schems = get_partition_scheme(virtual_partitions, self.base_paths)
+        
+        link_base_schemes = get_partition_scheme(partition, self.base_paths, "data_links/" + test)
+        # Match link paths to real data paths.
+        offset = 0
+        group_size = len(data_schemes) / len(link_base_schemes)
+        for link_disk, link_virtual, link_index, link_path in enumerate(link_base_schemes):
+            for data_disk, data_virtual, data_index, data_path in enumerate(data_schemes):
+                if test == "local_speed_up" and offset <= data_index and data_index < offset + group_size:
+                    scheme.append([data_index, link_index, data_path, link_path])
+                elif test == "local_batch_scale_out" and data_index == link_index:
+                    scheme.append([data_index, link_index, data_path, link_path])
+            offset += group_size
+        return scheme
     
-    def build_cluster_data_links(self, test, xml_save_path):
+    def get_cluster_partition_scheme(self, test, xml_save_path):
         node_index = self.get_current_node_index()
         if node_index == -1:
             print "Unknown host."
             return 
         
+        scheme = []
         virtual_partitions = get_cluster_virtual_partitions(self.nodes, self.partitions)
         data_paths = get_partition_paths(virtual_partitions, self.base_paths)
         link_base_paths = get_cluster_link_paths(len(self.nodes), self.base_paths, "data_links/" + test)
@@ -88,8 +122,6 @@ class WeatherBenchmark:
         # Match link paths to real data paths.
         link_base_paths.sort()
         for link_index, link_path in enumerate(link_base_paths):
-            if os.path.isdir(link_path):
-                shutil.rmtree(link_path)
             # Prep
             link_offset = link_index % len(self.nodes)
             disk_offset = link_index // len(self.nodes)
@@ -111,8 +143,33 @@ class WeatherBenchmark:
             data_paths.sort()
             for data_index, data_path in enumerate(data_paths):
                 if has_data and node_offset <= data_index and data_index < node_offset + group_size:
-                    self.add_collection_links_for(data_path, link_path, data_index)
-            self.add_collection_links_for("", link_path, -1)
+                    scheme.append([data_index, link_index, data_path, link_path])
+            scheme.append([-1, link_index, "", link_path])
+        return scheme
+    
+    def build_data_links(self, xml_save_path):
+        if (len(self.base_paths) == 0):
+            return
+        for test in self.dataset.get_tests():
+            if test in self.BENCHMARK_LOCAL_TESTS:
+                for i in self.partitions:
+                    scheme = self.get_local_partition_scheme(test, xml_save_path, i)
+                    self.build_data_links_scheme(scheme)
+            elif test in self.BENCHMARK_CLUSTER_TESTS:
+                scheme = self.get_cluster_partition_scheme(test, xml_save_path)
+                self.build_data_links_scheme(scheme)
+            else:
+                print "Unknown test."
+                exit()
+    
+    def build_data_links_scheme(self, scheme):
+        """Build all the data links based on the scheme information."""
+        link_path_cleared = []
+        for (data_index, partition, data_path, link_path) in scheme:
+            if link_path not in link_path_cleared and os.path.isdir(link_path):
+                shutil.rmtree(link_path)
+                link_path_cleared.append(link_path)
+            self.add_collection_links_for(data_path, link_path, data_index)
     
     def get_current_node_index(self):
         found = False
@@ -172,7 +229,7 @@ class WeatherBenchmark:
     def copy_and_replace_query(self, query_path, replacement_list):
         '''Copy the query files over to the query_path and replace the path
         for the where the collection data is located.'''
-        for query_file in self.QUERY_FILE_LIST:
+        for query_file in self.QUERY_FILE_LIST + self.QUERY_UTILITY_LIST:
             shutil.copyfile(self.QUERY_MASTER_FOLDER + query_file, query_path + query_file)
         
             # Make a search replace for each collection.

http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/491e5917/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py
----------------------------------------------------------------------
diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py
index a1a1aa2..52945e5 100644
--- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py
+++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py
@@ -24,7 +24,6 @@ from weather_config import *
 from weather_benchmark import *
 
 DEBUG_OUTPUT = False
-COMPRESSED = False
 
 #
 # Weather conversion for GHCN-DAILY files to xml.
@@ -42,7 +41,7 @@ def main(argv):
     xml_config_path = ""
     
     try:
-        opts, args = getopt.getopt(argv, "acf:hl:m:ruvw:x:", ["file=", "locality=", "max_station_files=", "web_service=", "xml_config="])
+        opts, args = getopt.getopt(argv, "af:hl:m:ruvw:x:", ["file=", "locality=", "max_station_files=", "web_service=", "xml_config="])
     except getopt.GetoptError:
         print 'The file options for weather_cli.py were not correctly specified.'
         print 'To see a full list of options try:'
@@ -52,10 +51,9 @@ def main(argv):
         if opt == '-h':
             print 'Converting weather daily files to xml options:'
             print '    -a        Append the results to the progress file.'
-            print '    -c        Compress the produced XML file with .gz.'
             print '    -f (str)  The file name of a specific station to process.'
             print '              * Helpful when testing a single stations XML file output.'
-            print '    -l (str)  Select the locality of the scripts execution (download, progress_file, sensor_build, station_build, partition, statistics).'
+            print '    -l (str)  Select the locality of the scripts execution (download, progress_file, sensor_build, station_build, partition, partition_scheme, statistics).'
             print '    -m (int)  Limits the number of files created for each station.'
             print '              * Helpful when testing to make sure all elements are supported for each station.'
             print '              Alternate form: --max_station_files=(int)'
@@ -67,9 +65,6 @@ def main(argv):
             sys.exit()
         elif opt in ('-a', "--append"):
             append = True
-        elif opt == '-c':
-            global COMPRESSED
-            COMPRESSED = True
         elif opt in ('-f', "--file"):
             # check if file exists.
             if os.path.exists(arg):
@@ -78,7 +73,7 @@ def main(argv):
                 print 'Error: Argument must be a file name for --file (-f).'
                 sys.exit()
         elif opt in ('-l', "--locality"):
-            if arg in ("download", "progress_file", "sensor_build", "station_build", "partition", "test_links", "queries", "statistics"):
+            if arg in ("download", "progress_file", "sensor_build", "station_build", "partition", "partition_scheme", "test_links", "queries", "statistics"):
                 section = arg
             else:
                 print 'Error: Argument must be a string for --locality (-l) and a valid locality.'
@@ -144,7 +139,7 @@ def main(argv):
         os.makedirs(xml_data_save_path)
 
     # Set up the XML build objects.
-    convert = WeatherWebServiceMonthlyXMLFile(download_path, xml_data_save_path, COMPRESSED, DEBUG_OUTPUT)
+    convert = WeatherWebServiceMonthlyXMLFile(download_path, xml_data_save_path, DEBUG_OUTPUT)
     progress_file = xml_data_save_path + "_data_progress.csv"
     data = WeatherDataFiles(ghcnd_data_dly_path, progress_file)
     if section in ("all", "progress_file"):
@@ -207,15 +202,19 @@ def main(argv):
             base_paths.append(paths + dataset_folder + "/")
         benchmark = WeatherBenchmark(base_paths, dataset.get_partitions(), dataset, config.get_node_machine_list())
         
-        if section in ("all", "partition"):
+        if section in ("all", "partition", "partition_scheme"):
             slices = benchmark.get_number_of_slices()
             print 'Processing the partition section (' + dataset.get_name() + ':d' + str(len(base_paths)) + ':s' + str(slices) + ').'
             data.reset()
-            data.copy_to_n_partitions(xml_data_save_path, slices, base_paths, reset)
+            if section == "partition_scheme":
+                benchmark.print_partition_scheme(xml_data_save_path)
+            else:
+                data.copy_to_n_partitions(xml_data_save_path, slices, base_paths, reset)
     
         if section in ("all", "test_links"):
             # TODO determine current node 
             print 'Processing the test links section (' + dataset.get_name() + ').'
+            benchmark.print_partition_scheme(xml_data_save_path)
             benchmark.build_data_links(xml_data_save_path)
 
         if section in ("all", "queries"):

http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/491e5917/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_convert_to_xml.py
----------------------------------------------------------------------
diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_convert_to_xml.py b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_convert_to_xml.py
index a7adce5..c115efa 100644
--- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_convert_to_xml.py
+++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_convert_to_xml.py
@@ -17,7 +17,6 @@
 import textwrap
 from datetime import date
 import os
-import gzip
 from collections import OrderedDict
 
 # Custom modules.
@@ -104,9 +103,8 @@ class WeatherConvertToXML:
     
     token = ""
     
-    def __init__(self, base_path, save_path, compressed, debug_output):
+    def __init__(self, base_path, save_path, debug_output):
         self.save_path = save_path
-        self.compressed = compressed
         self.debug_output = debug_output
 
         # Extra support files.
@@ -134,14 +132,9 @@ class WeatherConvertToXML:
             print str(field[FIELD_INDEX_NAME]) + " = '" + row[(field[FIELD_INDEX_START] - 1):field[FIELD_INDEX_END]] + "'"
     
     def save_file(self, filename, contents):
-        if self.compressed:
-            filename = filename + '.gz'
-            file = gzip.open(filename, 'wb')
-        else:
-            file = open(filename, 'w')
+        file = open(filename, 'w')
         file.write(contents)
         file.close()
-        
         return filename
     
     def get_folder_size(self, folder_name):

http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/491e5917/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py
----------------------------------------------------------------------
diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py
index a9b4ecc..a68cba1 100644
--- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py
+++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py
@@ -318,13 +318,26 @@ class WeatherDataFiles:
         return columns[self.INDEX_DATA_FILE_NAME]
     
     
+# Index values of each field details.
+PARTITION_INDEX_DISK = 0
+PARTITION_INDEX_VIRTUAL = 1
+PARTITION_INDEX = 2
+PARTITION_INDEX_PATH = 3
+PARTITION_HEADER = ("Disk", "Virtual", "Index", "Path")
+            
 def get_partition_paths(partitions, base_paths, key="partitions"):        
     partition_paths = []
+    for scheme in get_partition_scheme(partitions, base_paths, key):
+        partition_paths.append(scheme[PARTITION_INDEX_PATH])
+    return partition_paths
+
+def get_partition_scheme(partitions, base_paths, key="partitions"):        
+    partition_scheme = []
     for i in range(0, partitions):
         for j in range(0, len(base_paths)):
             new_partition_path = base_paths[j] + key + "/" + get_partition_folder(j, partitions, i) + "/"
-            partition_paths.append(new_partition_path)
-    return partition_paths
+            partition_scheme.append((j, partitions, i, new_partition_path))
+    return partition_scheme
 
 def get_partition_folder(disks, partitions, index):        
     return "d" + str(disks) + "_p" + str(partitions) + "_i" + str(index)