You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@vxquery.apache.org by pr...@apache.org on 2014/04/02 06:12:04 UTC

[24/50] [abbrv] git commit: Made better variable names and fixed local partition scheme.

Made better variable names and fixed local partition scheme.


Project: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/commit/a74fd60e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/tree/a74fd60e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/diff/a74fd60e

Branch: refs/heads/prestonc/hash_join
Commit: a74fd60e1d9e81e30613d20dfb2ecb7ee3b4362d
Parents: 7566f5e
Author: Preston Carman <pr...@apache.org>
Authored: Tue Mar 18 15:58:44 2014 -0700
Committer: Preston Carman <pr...@apache.org>
Committed: Tue Apr 1 20:56:25 2014 -0700

----------------------------------------------------------------------
 .../scripts/weather_benchmark.py                | 58 +++++++++++---------
 1 file changed, 32 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/a74fd60e/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py
----------------------------------------------------------------------
diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py
index fdd08ec..2077c10 100644
--- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py
+++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py
@@ -63,16 +63,16 @@ class WeatherBenchmark:
     def print_local_partition_schemes(self, test, xml_save_path):
         node_index = 0
         virtual_partitions = get_local_virtual_partitions(self.partitions)
-        for i in self.partitions:
-            scheme = self.get_local_partition_scheme(test, xml_save_path, i)
-            self.print_partition_schemes(virtual_partitions, scheme, test, i, node_index)
+        for p in self.partitions:
+            scheme = self.get_local_partition_scheme(test, xml_save_path, p)
+            self.print_partition_schemes(virtual_partitions, scheme, test, p, node_index)
         
     def print_cluster_partition_schemes(self, test, xml_save_path):
         node_index = self.get_current_node_index()
         virtual_partitions = get_cluster_virtual_partitions(self.nodes, self.partitions)
-        for i in self.partitions:
-            scheme = self.get_cluster_partition_scheme(test, xml_save_path, i)
-            self.print_partition_schemes(virtual_partitions, scheme, test, i, node_index)
+        for p in self.partitions:
+            scheme = self.get_cluster_partition_scheme(test, xml_save_path, p)
+            self.print_partition_schemes(virtual_partitions, scheme, test, p, node_index)
         
     def print_partition_schemes(self, virtual_partitions, scheme, test, partitions, node_id):
         print
@@ -98,18 +98,23 @@ class WeatherBenchmark:
         scheme = []
         virtual_partitions = get_local_virtual_partitions(self.partitions)
         data_schemes = get_partition_scheme(0, virtual_partitions, self.base_paths)
-        
         link_base_schemes = get_partition_scheme(0, partition, self.base_paths, "data_links/" + test)
+
         # Match link paths to real data paths.
-        offset = 0
         group_size = len(data_schemes) / len(link_base_schemes)
-        for link_node, link_disk, link_virtual, link_index, link_path in link_base_schemes:
-            for data_node, data_disk, data_virtual, data_index, data_path in data_schemes:
-                if test == "local_speed_up" and offset <= data_index and data_index < offset + group_size:
-                    scheme.append([data_index, link_index, data_path, link_path])
-                elif test == "local_batch_scale_out" and data_index == link_index:
-                    scheme.append([data_index, link_index, data_path, link_path])
-            offset += group_size
+        for d in range(len(self.base_paths)):
+            offset = 0
+            for link_node, link_disk, link_virtual, link_index, link_path in link_base_schemes:
+                if d == link_disk:
+                    # Only consider a single disk at a time.
+                    for data_node, data_disk, data_virtual, data_index, data_path in data_schemes:
+                        if test == "local_speed_up" and data_disk == link_disk \
+                                and offset <= data_index and data_index < offset + group_size:
+                            scheme.append([data_disk, data_index, link_index, data_path, link_path])
+                        elif test == "local_batch_scale_out" and data_disk == link_disk \
+                                and data_index == link_index:
+                            scheme.append([data_disk, data_index, link_index, data_path, link_path])
+                    offset += group_size
         return scheme
     
     def get_cluster_partition_scheme(self, test, xml_save_path, partition):
@@ -142,7 +147,8 @@ class WeatherBenchmark:
                 has_data = False
             # Make links
             for date_node, data_disk, data_virtual, data_index, data_path in data_schemes:
-                if has_data and data_disk == link_disk and node_offset <= data_index and data_index < node_offset + group_size:
+                if has_data and data_disk == link_disk \
+                        and node_offset <= data_index and data_index < node_offset + group_size:
                     scheme.append([link_disk, data_index, link_index, data_path, link_path])
             scheme.append([link_disk, -1, link_index, "", link_path])
         return scheme
@@ -166,7 +172,7 @@ class WeatherBenchmark:
     def build_data_links_scheme(self, scheme):
         """Build all the data links based on the scheme information."""
         link_path_cleared = []
-        for (data_index, partition, data_path, link_path) in scheme:
+        for (data_disk, data_index, partition, data_path, link_path) in scheme:
             if link_path not in link_path_cleared and os.path.isdir(link_path):
                 shutil.rmtree(link_path)
                 link_path_cleared.append(link_path)
@@ -208,24 +214,24 @@ class WeatherBenchmark:
         '''Determine the data_link path for cluster query files and copy with
         new location for collection.'''
         partitions = self.dataset.get_partitions()[0]
-        for i in self.partitions:
-            for j in range(len(self.nodes)):
-                query_path = get_cluster_query_path(self.base_paths, test, i, j)
+        for n in range(len(self.nodes)):
+            for p in self.partitions:
+                query_path = get_cluster_query_path(self.base_paths, test, p, n)
                 prepare_path(query_path, reset)
             
                 # Copy query files.
-                partition_paths = get_cluster_link_paths_for_node(i, self.base_paths, "data_links/" + test)
+                partition_paths = get_partition_paths(n, p, self.base_paths, "data_links/" + test + "/" + str(n) + "nodes")
                 self.copy_and_replace_query(query_path, partition_paths)
 
     def copy_local_query_files(self, test, reset):
         '''Determine the data_link path for local query files and copy with
         new location for collection.'''
-        for i in self.partitions:
-            query_path = get_local_query_path(self.base_paths, test, i)
+        for p in self.partitions:
+            query_path = get_local_query_path(self.base_paths, test, p)
             prepare_path(query_path, reset)
     
             # Copy query files.
-            partition_paths = get_partition_paths(0, i, self.base_paths, "data_links/" + test)
+            partition_paths = get_partition_paths(0, p, self.base_paths, "data_links/" + test)
             self.copy_and_replace_query(query_path, partition_paths)
 
     def copy_and_replace_query(self, query_path, replacement_list):
@@ -259,8 +265,8 @@ class WeatherBenchmark:
 
 def get_cluster_link_scheme(nodes, partition, base_paths, key="partitions"):        
     link_paths = []
-    for i in range(0, nodes):
-        new_link_path = get_partition_scheme(i, partition, base_paths, key + "/" + str(i) + "nodes")
+    for n in range(0, nodes):
+        new_link_path = get_partition_scheme(n, partition, base_paths, key + "/" + str(n) + "nodes")
         link_paths.extend(new_link_path)
     return link_paths