You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@vxquery.apache.org by pr...@apache.org on 2014/04/02 06:12:14 UTC

[34/50] [abbrv] git commit: Changing benchmark scripts to support local partitions on a cluster.

Changing benchmark scripts to support local partitions on a cluster.


Project: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/commit/ea297e72
Tree: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/tree/ea297e72
Diff: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/diff/ea297e72

Branch: refs/heads/prestonc/hash_join
Commit: ea297e72bd00d15649fa47991833d114c93c3821
Parents: 0b1033f
Author: Preston Carman <pr...@apache.org>
Authored: Mon Mar 17 18:08:54 2014 -0700
Committer: Preston Carman <pr...@apache.org>
Committed: Tue Apr 1 20:56:25 2014 -0700

----------------------------------------------------------------------
 .../scripts/weather_benchmark.py                | 104 +++++++++----------
 .../scripts/weather_data_files.py               |  21 ++--
 2 files changed, 60 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/ea297e72/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py
----------------------------------------------------------------------
diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py
index 6374d38..fdd08ec 100644
--- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py
+++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py
@@ -61,28 +61,32 @@ class WeatherBenchmark:
                 exit()
             
     def print_local_partition_schemes(self, test, xml_save_path):
+        node_index = 0
+        virtual_partitions = get_local_virtual_partitions(self.partitions)
         for i in self.partitions:
             scheme = self.get_local_partition_scheme(test, xml_save_path, i)
-            virtual_partitions = get_local_virtual_partitions(self.partitions)
-            self.print_partition_schemes(virtual_partitions, scheme, test, i)
+            self.print_partition_schemes(virtual_partitions, scheme, test, i, node_index)
         
     def print_cluster_partition_schemes(self, test, xml_save_path):
-        scheme = self.get_cluster_partition_scheme(test, xml_save_path)
+        node_index = self.get_current_node_index()
         virtual_partitions = get_cluster_virtual_partitions(self.nodes, self.partitions)
-        self.print_partition_schemes(virtual_partitions, scheme, test, 0)
+        for i in self.partitions:
+            scheme = self.get_cluster_partition_scheme(test, xml_save_path, i)
+            self.print_partition_schemes(virtual_partitions, scheme, test, i, node_index)
         
-    def print_partition_schemes(self, virtual_partitions, scheme, test, partitions):
+    def print_partition_schemes(self, virtual_partitions, scheme, test, partitions, node_id):
         print
         print "---------------- Partition Scheme --------------------"
         print "    Test: " + test
         print "    Virtual Partitions: " + str(virtual_partitions)
         print "    Disks: " + str(len(self.base_paths))
         print "    Partitions: " + str(partitions)
+        print "    Node Id: " + str(node_id)
         
         if len(scheme) > 0:
             folder_length = len(scheme[0][3]) + 5
-            row_format = "{:>5} {:>5} {:<" + str(folder_length) + "} {:<" + str(folder_length) + "}"
-            HEADER = ("Index", "Link", "Data Path", "Link Path")
+            row_format = "{:>5} {:>5} {:>5} {:<" + str(folder_length) + "} {:<" + str(folder_length) + "}"
+            HEADER = ("Disk", "Index", "Link", "Data Path", "Link Path")
             print row_format.format(*HEADER)
             for row in scheme:
                 print row_format.format(*row)
@@ -93,14 +97,14 @@ class WeatherBenchmark:
     def get_local_partition_scheme(self, test, xml_save_path, partition):
         scheme = []
         virtual_partitions = get_local_virtual_partitions(self.partitions)
-        data_schems = get_partition_scheme(virtual_partitions, self.base_paths)
+        data_schemes = get_partition_scheme(0, virtual_partitions, self.base_paths)
         
-        link_base_schemes = get_partition_scheme(partition, self.base_paths, "data_links/" + test)
+        link_base_schemes = get_partition_scheme(0, partition, self.base_paths, "data_links/" + test)
         # Match link paths to real data paths.
         offset = 0
         group_size = len(data_schemes) / len(link_base_schemes)
-        for link_disk, link_virtual, link_index, link_path in enumerate(link_base_schemes):
-            for data_disk, data_virtual, data_index, data_path in enumerate(data_schemes):
+        for link_node, link_disk, link_virtual, link_index, link_path in link_base_schemes:
+            for data_node, data_disk, data_virtual, data_index, data_path in data_schemes:
                 if test == "local_speed_up" and offset <= data_index and data_index < offset + group_size:
                     scheme.append([data_index, link_index, data_path, link_path])
                 elif test == "local_batch_scale_out" and data_index == link_index:
@@ -108,43 +112,39 @@ class WeatherBenchmark:
             offset += group_size
         return scheme
     
-    def get_cluster_partition_scheme(self, test, xml_save_path):
+    def get_cluster_partition_scheme(self, test, xml_save_path, partition):
         node_index = self.get_current_node_index()
         if node_index == -1:
             print "Unknown host."
             return 
         
         scheme = []
+        local_virtual_partitions = get_local_virtual_partitions(self.partitions)
         virtual_partitions = get_cluster_virtual_partitions(self.nodes, self.partitions)
-        data_paths = get_partition_paths(virtual_partitions, self.base_paths)
-        link_base_paths = get_cluster_link_paths(len(self.nodes), self.base_paths, "data_links/" + test)
+        data_schemes = get_partition_scheme(node_index, virtual_partitions, self.base_paths)
+        link_base_schemes = get_cluster_link_scheme(len(self.nodes), partition, self.base_paths, "data_links/" + test)
 
         # Match link paths to real data paths.
-        link_base_paths.sort()
-        for link_index, link_path in enumerate(link_base_paths):
+        for link_node, link_disk, link_virtual, link_index, link_path in link_base_schemes:
             # Prep
-            link_offset = link_index % len(self.nodes)
-            disk_offset = link_index // len(self.nodes)
             if test == "speed_up":
-                group_size = len(data_paths) / (link_offset + 1) / (len(self.base_paths))
+                group_size = virtual_partitions / (link_node + 1)
             elif test == "batch_scale_out":
-                group_size = len(data_paths) / len(self.nodes) / (len(self.base_paths))
+                group_size = virtual_partitions / len(self.nodes)
             else:
                 print "Unknown test."
                 return
-            node_offset = group_size * node_index
-            for j in range(disk_offset):
-                node_offset += len(data_paths) / (len(self.base_paths))
+            group_size = group_size / link_virtual
+            node_offset = group_size * (node_index * local_virtual_partitions)
+            node_offset += group_size * link_index
             has_data = True
-            if link_offset < node_index:
+            if link_node < node_index:
                 has_data = False
-                    
             # Make links
-            data_paths.sort()
-            for data_index, data_path in enumerate(data_paths):
-                if has_data and node_offset <= data_index and data_index < node_offset + group_size:
-                    scheme.append([data_index, link_index, data_path, link_path])
-            scheme.append([-1, link_index, "", link_path])
+            for date_node, data_disk, data_virtual, data_index, data_path in data_schemes:
+                if has_data and data_disk == link_disk and node_offset <= data_index and data_index < node_offset + group_size:
+                    scheme.append([link_disk, data_index, link_index, data_path, link_path])
+            scheme.append([link_disk, -1, link_index, "", link_path])
         return scheme
     
     def build_data_links(self, xml_save_path):
@@ -156,8 +156,9 @@ class WeatherBenchmark:
                     scheme = self.get_local_partition_scheme(test, xml_save_path, i)
                     self.build_data_links_scheme(scheme)
             elif test in self.BENCHMARK_CLUSTER_TESTS:
-                scheme = self.get_cluster_partition_scheme(test, xml_save_path)
-                self.build_data_links_scheme(scheme)
+                for i in self.partitions:
+                    scheme = self.get_cluster_partition_scheme(test, xml_save_path, i)
+                    self.build_data_links_scheme(scheme)
             else:
                 print "Unknown test."
                 exit()
@@ -207,13 +208,14 @@ class WeatherBenchmark:
         '''Determine the data_link path for cluster query files and copy with
         new location for collection.'''
         partitions = self.dataset.get_partitions()[0]
-        for i in range(len(self.nodes)):
-            query_path = get_cluster_query_path(self.base_paths, test, i)
-            prepare_path(query_path, reset)
-        
-            # Copy query files.
-            partition_paths = get_cluster_link_paths_for_node(i, self.base_paths, "data_links/" + test)
-            self.copy_and_replace_query(query_path, partition_paths)
+        for i in self.partitions:
+            for j in range(len(self.nodes)):
+                query_path = get_cluster_query_path(self.base_paths, test, i, j)
+                prepare_path(query_path, reset)
+            
+                # Copy query files.
+                partition_paths = get_cluster_link_paths_for_node(i, self.base_paths, "data_links/" + test)
+                self.copy_and_replace_query(query_path, partition_paths)
 
     def copy_local_query_files(self, test, reset):
         '''Determine the data_link path for local query files and copy with
@@ -223,7 +225,7 @@ class WeatherBenchmark:
             prepare_path(query_path, reset)
     
             # Copy query files.
-            partition_paths = get_partition_paths(i, self.base_paths, "data_links/" + test)
+            partition_paths = get_partition_paths(0, i, self.base_paths, "data_links/" + test)
             self.copy_and_replace_query(query_path, partition_paths)
 
     def copy_and_replace_query(self, query_path, replacement_list):
@@ -255,34 +257,26 @@ class WeatherBenchmark:
                     print "Unknown test."
                     exit()
 
-def get_cluster_link_paths(nodes, base_paths, key="partitions"):        
+def get_cluster_link_scheme(nodes, partition, base_paths, key="partitions"):        
     link_paths = []
     for i in range(0, nodes):
-        new_link_path = get_cluster_link_paths_for_node(i, base_paths, key)
+        new_link_path = get_partition_scheme(i, partition, base_paths, key + "/" + str(i) + "nodes")
         link_paths.extend(new_link_path)
     return link_paths
 
-def get_cluster_link_paths_for_node(node_id, base_paths, key="partitions"):        
-    link_paths = []
-    for j in range(0, len(base_paths)):
-        new_link_path = base_paths[j] + key + "/" + str(node_id) + "nodes/"
-        link_paths.append(new_link_path)
-    return link_paths
-
 def get_local_query_path(base_paths, test, partition):        
     return base_paths[0] + "queries/" + test + "/" + get_local_query_folder(len(base_paths), partition) + "/"
 
 def get_local_query_folder(disks, partitions):        
     return "d" + str(disks) + "_p" + str(partitions)
 
-def get_cluster_query_path(base_paths, test, nodes):        
-    return base_paths[0] + "queries/" + test + "/" + str(nodes) + "nodes/"
+def get_cluster_query_path(base_paths, test, partition, nodes):        
+    return base_paths[0] + "queries/" + test + "/" + str(nodes) + "nodes/" + get_local_query_folder(len(base_paths), partition) + "/"
 
 def get_cluster_virtual_partitions(nodes, partitions):
-    if len(partitions) != 1:
-        print "Cluster configurations must only have one partition."
-        exit()
-    return calculate_partitions(range(len(nodes), 0, -1))
+    vp = get_local_virtual_partitions(partitions)
+    vn = calculate_partitions(range(len(nodes), 0, -1))
+    return vp * vn
 
 def get_local_virtual_partitions(partitions):
     return calculate_partitions(partitions)

http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/ea297e72/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py
----------------------------------------------------------------------
diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py
index a68cba1..64e19d6 100644
--- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py
+++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py
@@ -99,7 +99,7 @@ class WeatherDataFiles:
         
         # Initialize the partition paths.
         partition_sizes = []
-        partition_paths = get_partition_paths(partitions, base_paths)
+        partition_paths = get_partition_paths(0, partitions, base_paths)
         for path in partition_paths:
             partition_sizes.append(0)
             # Make sure the xml folder is available.
@@ -319,24 +319,25 @@ class WeatherDataFiles:
     
     
 # Index values of each field details.
-PARTITION_INDEX_DISK = 0
-PARTITION_INDEX_VIRTUAL = 1
-PARTITION_INDEX = 2
-PARTITION_INDEX_PATH = 3
-PARTITION_HEADER = ("Disk", "Virtual", "Index", "Path")
+PARTITION_INDEX_NODE = 0
+PARTITION_INDEX_DISK = 1
+PARTITION_INDEX_VIRTUAL = 2
+PARTITION_INDEX = 3
+PARTITION_INDEX_PATH = 4
+PARTITION_HEADER = ("Node", "Disk", "Virtual", "Index", "Path")
             
-def get_partition_paths(partitions, base_paths, key="partitions"):        
+def get_partition_paths(node_id, partitions, base_paths, key="partitions"):        
     partition_paths = []
-    for scheme in get_partition_scheme(partitions, base_paths, key):
+    for scheme in get_partition_scheme(node_id, partitions, base_paths, key):
         partition_paths.append(scheme[PARTITION_INDEX_PATH])
     return partition_paths
 
-def get_partition_scheme(partitions, base_paths, key="partitions"):        
+def get_partition_scheme(node_id, partitions, base_paths, key="partitions"):        
     partition_scheme = []
     for i in range(0, partitions):
         for j in range(0, len(base_paths)):
             new_partition_path = base_paths[j] + key + "/" + get_partition_folder(j, partitions, i) + "/"
-            partition_scheme.append((j, partitions, i, new_partition_path))
+            partition_scheme.append((node_id, j, partitions, i, new_partition_path))
     return partition_scheme
 
 def get_partition_folder(disks, partitions, index):