You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@vxquery.apache.org by pr...@apache.org on 2014/04/02 06:12:04 UTC
[24/50] [abbrv] git commit: Made better variable names and fixed
local partition scheme.
Made better variable names and fixed local partition scheme.
Project: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/commit/a74fd60e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/tree/a74fd60e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/diff/a74fd60e
Branch: refs/heads/prestonc/hash_join
Commit: a74fd60e1d9e81e30613d20dfb2ecb7ee3b4362d
Parents: 7566f5e
Author: Preston Carman <pr...@apache.org>
Authored: Tue Mar 18 15:58:44 2014 -0700
Committer: Preston Carman <pr...@apache.org>
Committed: Tue Apr 1 20:56:25 2014 -0700
----------------------------------------------------------------------
.../scripts/weather_benchmark.py | 58 +++++++++++---------
1 file changed, 32 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/a74fd60e/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py
----------------------------------------------------------------------
diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py
index fdd08ec..2077c10 100644
--- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py
+++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py
@@ -63,16 +63,16 @@ class WeatherBenchmark:
def print_local_partition_schemes(self, test, xml_save_path):
node_index = 0
virtual_partitions = get_local_virtual_partitions(self.partitions)
- for i in self.partitions:
- scheme = self.get_local_partition_scheme(test, xml_save_path, i)
- self.print_partition_schemes(virtual_partitions, scheme, test, i, node_index)
+ for p in self.partitions:
+ scheme = self.get_local_partition_scheme(test, xml_save_path, p)
+ self.print_partition_schemes(virtual_partitions, scheme, test, p, node_index)
def print_cluster_partition_schemes(self, test, xml_save_path):
node_index = self.get_current_node_index()
virtual_partitions = get_cluster_virtual_partitions(self.nodes, self.partitions)
- for i in self.partitions:
- scheme = self.get_cluster_partition_scheme(test, xml_save_path, i)
- self.print_partition_schemes(virtual_partitions, scheme, test, i, node_index)
+ for p in self.partitions:
+ scheme = self.get_cluster_partition_scheme(test, xml_save_path, p)
+ self.print_partition_schemes(virtual_partitions, scheme, test, p, node_index)
def print_partition_schemes(self, virtual_partitions, scheme, test, partitions, node_id):
print
@@ -98,18 +98,23 @@ class WeatherBenchmark:
scheme = []
virtual_partitions = get_local_virtual_partitions(self.partitions)
data_schemes = get_partition_scheme(0, virtual_partitions, self.base_paths)
-
link_base_schemes = get_partition_scheme(0, partition, self.base_paths, "data_links/" + test)
+
# Match link paths to real data paths.
- offset = 0
group_size = len(data_schemes) / len(link_base_schemes)
- for link_node, link_disk, link_virtual, link_index, link_path in link_base_schemes:
- for data_node, data_disk, data_virtual, data_index, data_path in data_schemes:
- if test == "local_speed_up" and offset <= data_index and data_index < offset + group_size:
- scheme.append([data_index, link_index, data_path, link_path])
- elif test == "local_batch_scale_out" and data_index == link_index:
- scheme.append([data_index, link_index, data_path, link_path])
- offset += group_size
+ for d in range(len(self.base_paths)):
+ offset = 0
+ for link_node, link_disk, link_virtual, link_index, link_path in link_base_schemes:
+ if d == link_disk:
+ # Only consider a single disk at a time.
+ for data_node, data_disk, data_virtual, data_index, data_path in data_schemes:
+ if test == "local_speed_up" and data_disk == link_disk \
+ and offset <= data_index and data_index < offset + group_size:
+ scheme.append([data_disk, data_index, link_index, data_path, link_path])
+ elif test == "local_batch_scale_out" and data_disk == link_disk \
+ and data_index == link_index:
+ scheme.append([data_disk, data_index, link_index, data_path, link_path])
+ offset += group_size
return scheme
def get_cluster_partition_scheme(self, test, xml_save_path, partition):
@@ -142,7 +147,8 @@ class WeatherBenchmark:
has_data = False
# Make links
for date_node, data_disk, data_virtual, data_index, data_path in data_schemes:
- if has_data and data_disk == link_disk and node_offset <= data_index and data_index < node_offset + group_size:
+ if has_data and data_disk == link_disk \
+ and node_offset <= data_index and data_index < node_offset + group_size:
scheme.append([link_disk, data_index, link_index, data_path, link_path])
scheme.append([link_disk, -1, link_index, "", link_path])
return scheme
@@ -166,7 +172,7 @@ class WeatherBenchmark:
def build_data_links_scheme(self, scheme):
"""Build all the data links based on the scheme information."""
link_path_cleared = []
- for (data_index, partition, data_path, link_path) in scheme:
+ for (data_disk, data_index, partition, data_path, link_path) in scheme:
if link_path not in link_path_cleared and os.path.isdir(link_path):
shutil.rmtree(link_path)
link_path_cleared.append(link_path)
@@ -208,24 +214,24 @@ class WeatherBenchmark:
'''Determine the data_link path for cluster query files and copy with
new location for collection.'''
partitions = self.dataset.get_partitions()[0]
- for i in self.partitions:
- for j in range(len(self.nodes)):
- query_path = get_cluster_query_path(self.base_paths, test, i, j)
+ for n in range(len(self.nodes)):
+ for p in self.partitions:
+ query_path = get_cluster_query_path(self.base_paths, test, p, n)
prepare_path(query_path, reset)
# Copy query files.
- partition_paths = get_cluster_link_paths_for_node(i, self.base_paths, "data_links/" + test)
+ partition_paths = get_partition_paths(n, p, self.base_paths, "data_links/" + test + "/" + str(n) + "nodes")
self.copy_and_replace_query(query_path, partition_paths)
def copy_local_query_files(self, test, reset):
'''Determine the data_link path for local query files and copy with
new location for collection.'''
- for i in self.partitions:
- query_path = get_local_query_path(self.base_paths, test, i)
+ for p in self.partitions:
+ query_path = get_local_query_path(self.base_paths, test, p)
prepare_path(query_path, reset)
# Copy query files.
- partition_paths = get_partition_paths(0, i, self.base_paths, "data_links/" + test)
+ partition_paths = get_partition_paths(0, p, self.base_paths, "data_links/" + test)
self.copy_and_replace_query(query_path, partition_paths)
def copy_and_replace_query(self, query_path, replacement_list):
@@ -259,8 +265,8 @@ class WeatherBenchmark:
def get_cluster_link_scheme(nodes, partition, base_paths, key="partitions"):
link_paths = []
- for i in range(0, nodes):
- new_link_path = get_partition_scheme(i, partition, base_paths, key + "/" + str(i) + "nodes")
+ for n in range(0, nodes):
+ new_link_path = get_partition_scheme(n, partition, base_paths, key + "/" + str(n) + "nodes")
link_paths.extend(new_link_path)
return link_paths