You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@vxquery.apache.org by pr...@apache.org on 2014/04/02 06:12:14 UTC
[34/50] [abbrv] git commit: Changing benchmark scripts to support
local partitions on a cluster.
Changing benchmark scripts to support local partitions on a cluster.
Project: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/commit/ea297e72
Tree: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/tree/ea297e72
Diff: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/diff/ea297e72
Branch: refs/heads/prestonc/hash_join
Commit: ea297e72bd00d15649fa47991833d114c93c3821
Parents: 0b1033f
Author: Preston Carman <pr...@apache.org>
Authored: Mon Mar 17 18:08:54 2014 -0700
Committer: Preston Carman <pr...@apache.org>
Committed: Tue Apr 1 20:56:25 2014 -0700
----------------------------------------------------------------------
.../scripts/weather_benchmark.py | 104 +++++++++----------
.../scripts/weather_data_files.py | 21 ++--
2 files changed, 60 insertions(+), 65 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/ea297e72/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py
----------------------------------------------------------------------
diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py
index 6374d38..fdd08ec 100644
--- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py
+++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py
@@ -61,28 +61,32 @@ class WeatherBenchmark:
exit()
def print_local_partition_schemes(self, test, xml_save_path):
+ node_index = 0
+ virtual_partitions = get_local_virtual_partitions(self.partitions)
for i in self.partitions:
scheme = self.get_local_partition_scheme(test, xml_save_path, i)
- virtual_partitions = get_local_virtual_partitions(self.partitions)
- self.print_partition_schemes(virtual_partitions, scheme, test, i)
+ self.print_partition_schemes(virtual_partitions, scheme, test, i, node_index)
def print_cluster_partition_schemes(self, test, xml_save_path):
- scheme = self.get_cluster_partition_scheme(test, xml_save_path)
+ node_index = self.get_current_node_index()
virtual_partitions = get_cluster_virtual_partitions(self.nodes, self.partitions)
- self.print_partition_schemes(virtual_partitions, scheme, test, 0)
+ for i in self.partitions:
+ scheme = self.get_cluster_partition_scheme(test, xml_save_path, i)
+ self.print_partition_schemes(virtual_partitions, scheme, test, i, node_index)
- def print_partition_schemes(self, virtual_partitions, scheme, test, partitions):
+ def print_partition_schemes(self, virtual_partitions, scheme, test, partitions, node_id):
print
print "---------------- Partition Scheme --------------------"
print " Test: " + test
print " Virtual Partitions: " + str(virtual_partitions)
print " Disks: " + str(len(self.base_paths))
print " Partitions: " + str(partitions)
+ print " Node Id: " + str(node_id)
if len(scheme) > 0:
folder_length = len(scheme[0][3]) + 5
- row_format = "{:>5} {:>5} {:<" + str(folder_length) + "} {:<" + str(folder_length) + "}"
- HEADER = ("Index", "Link", "Data Path", "Link Path")
+ row_format = "{:>5} {:>5} {:>5} {:<" + str(folder_length) + "} {:<" + str(folder_length) + "}"
+ HEADER = ("Disk", "Index", "Link", "Data Path", "Link Path")
print row_format.format(*HEADER)
for row in scheme:
print row_format.format(*row)
@@ -93,14 +97,14 @@ class WeatherBenchmark:
def get_local_partition_scheme(self, test, xml_save_path, partition):
scheme = []
virtual_partitions = get_local_virtual_partitions(self.partitions)
- data_schems = get_partition_scheme(virtual_partitions, self.base_paths)
+ data_schemes = get_partition_scheme(0, virtual_partitions, self.base_paths)
- link_base_schemes = get_partition_scheme(partition, self.base_paths, "data_links/" + test)
+ link_base_schemes = get_partition_scheme(0, partition, self.base_paths, "data_links/" + test)
# Match link paths to real data paths.
offset = 0
group_size = len(data_schemes) / len(link_base_schemes)
- for link_disk, link_virtual, link_index, link_path in enumerate(link_base_schemes):
- for data_disk, data_virtual, data_index, data_path in enumerate(data_schemes):
+ for link_node, link_disk, link_virtual, link_index, link_path in link_base_schemes:
+ for data_node, data_disk, data_virtual, data_index, data_path in data_schemes:
if test == "local_speed_up" and offset <= data_index and data_index < offset + group_size:
scheme.append([data_index, link_index, data_path, link_path])
elif test == "local_batch_scale_out" and data_index == link_index:
@@ -108,43 +112,39 @@ class WeatherBenchmark:
offset += group_size
return scheme
- def get_cluster_partition_scheme(self, test, xml_save_path):
+ def get_cluster_partition_scheme(self, test, xml_save_path, partition):
node_index = self.get_current_node_index()
if node_index == -1:
print "Unknown host."
return
scheme = []
+ local_virtual_partitions = get_local_virtual_partitions(self.partitions)
virtual_partitions = get_cluster_virtual_partitions(self.nodes, self.partitions)
- data_paths = get_partition_paths(virtual_partitions, self.base_paths)
- link_base_paths = get_cluster_link_paths(len(self.nodes), self.base_paths, "data_links/" + test)
+ data_schemes = get_partition_scheme(node_index, virtual_partitions, self.base_paths)
+ link_base_schemes = get_cluster_link_scheme(len(self.nodes), partition, self.base_paths, "data_links/" + test)
# Match link paths to real data paths.
- link_base_paths.sort()
- for link_index, link_path in enumerate(link_base_paths):
+ for link_node, link_disk, link_virtual, link_index, link_path in link_base_schemes:
# Prep
- link_offset = link_index % len(self.nodes)
- disk_offset = link_index // len(self.nodes)
if test == "speed_up":
- group_size = len(data_paths) / (link_offset + 1) / (len(self.base_paths))
+ group_size = virtual_partitions / (link_node + 1)
elif test == "batch_scale_out":
- group_size = len(data_paths) / len(self.nodes) / (len(self.base_paths))
+ group_size = virtual_partitions / len(self.nodes)
else:
print "Unknown test."
return
- node_offset = group_size * node_index
- for j in range(disk_offset):
- node_offset += len(data_paths) / (len(self.base_paths))
+ group_size = group_size / link_virtual
+ node_offset = group_size * (node_index * local_virtual_partitions)
+ node_offset += group_size * link_index
has_data = True
- if link_offset < node_index:
+ if link_node < node_index:
has_data = False
-
# Make links
- data_paths.sort()
- for data_index, data_path in enumerate(data_paths):
- if has_data and node_offset <= data_index and data_index < node_offset + group_size:
- scheme.append([data_index, link_index, data_path, link_path])
- scheme.append([-1, link_index, "", link_path])
+ for date_node, data_disk, data_virtual, data_index, data_path in data_schemes:
+ if has_data and data_disk == link_disk and node_offset <= data_index and data_index < node_offset + group_size:
+ scheme.append([link_disk, data_index, link_index, data_path, link_path])
+ scheme.append([link_disk, -1, link_index, "", link_path])
return scheme
def build_data_links(self, xml_save_path):
@@ -156,8 +156,9 @@ class WeatherBenchmark:
scheme = self.get_local_partition_scheme(test, xml_save_path, i)
self.build_data_links_scheme(scheme)
elif test in self.BENCHMARK_CLUSTER_TESTS:
- scheme = self.get_cluster_partition_scheme(test, xml_save_path)
- self.build_data_links_scheme(scheme)
+ for i in self.partitions:
+ scheme = self.get_cluster_partition_scheme(test, xml_save_path, i)
+ self.build_data_links_scheme(scheme)
else:
print "Unknown test."
exit()
@@ -207,13 +208,14 @@ class WeatherBenchmark:
'''Determine the data_link path for cluster query files and copy with
new location for collection.'''
partitions = self.dataset.get_partitions()[0]
- for i in range(len(self.nodes)):
- query_path = get_cluster_query_path(self.base_paths, test, i)
- prepare_path(query_path, reset)
-
- # Copy query files.
- partition_paths = get_cluster_link_paths_for_node(i, self.base_paths, "data_links/" + test)
- self.copy_and_replace_query(query_path, partition_paths)
+ for i in self.partitions:
+ for j in range(len(self.nodes)):
+ query_path = get_cluster_query_path(self.base_paths, test, i, j)
+ prepare_path(query_path, reset)
+
+ # Copy query files.
+ partition_paths = get_cluster_link_paths_for_node(i, self.base_paths, "data_links/" + test)
+ self.copy_and_replace_query(query_path, partition_paths)
def copy_local_query_files(self, test, reset):
'''Determine the data_link path for local query files and copy with
@@ -223,7 +225,7 @@ class WeatherBenchmark:
prepare_path(query_path, reset)
# Copy query files.
- partition_paths = get_partition_paths(i, self.base_paths, "data_links/" + test)
+ partition_paths = get_partition_paths(0, i, self.base_paths, "data_links/" + test)
self.copy_and_replace_query(query_path, partition_paths)
def copy_and_replace_query(self, query_path, replacement_list):
@@ -255,34 +257,26 @@ class WeatherBenchmark:
print "Unknown test."
exit()
-def get_cluster_link_paths(nodes, base_paths, key="partitions"):
+def get_cluster_link_scheme(nodes, partition, base_paths, key="partitions"):
link_paths = []
for i in range(0, nodes):
- new_link_path = get_cluster_link_paths_for_node(i, base_paths, key)
+ new_link_path = get_partition_scheme(i, partition, base_paths, key + "/" + str(i) + "nodes")
link_paths.extend(new_link_path)
return link_paths
-def get_cluster_link_paths_for_node(node_id, base_paths, key="partitions"):
- link_paths = []
- for j in range(0, len(base_paths)):
- new_link_path = base_paths[j] + key + "/" + str(node_id) + "nodes/"
- link_paths.append(new_link_path)
- return link_paths
-
def get_local_query_path(base_paths, test, partition):
return base_paths[0] + "queries/" + test + "/" + get_local_query_folder(len(base_paths), partition) + "/"
def get_local_query_folder(disks, partitions):
return "d" + str(disks) + "_p" + str(partitions)
-def get_cluster_query_path(base_paths, test, nodes):
- return base_paths[0] + "queries/" + test + "/" + str(nodes) + "nodes/"
+def get_cluster_query_path(base_paths, test, partition, nodes):
+ return base_paths[0] + "queries/" + test + "/" + str(nodes) + "nodes/" + get_local_query_folder(len(base_paths), partition) + "/"
def get_cluster_virtual_partitions(nodes, partitions):
- if len(partitions) != 1:
- print "Cluster configurations must only have one partition."
- exit()
- return calculate_partitions(range(len(nodes), 0, -1))
+ vp = get_local_virtual_partitions(partitions)
+ vn = calculate_partitions(range(len(nodes), 0, -1))
+ return vp * vn
def get_local_virtual_partitions(partitions):
return calculate_partitions(partitions)
http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/ea297e72/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py
----------------------------------------------------------------------
diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py
index a68cba1..64e19d6 100644
--- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py
+++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py
@@ -99,7 +99,7 @@ class WeatherDataFiles:
# Initialize the partition paths.
partition_sizes = []
- partition_paths = get_partition_paths(partitions, base_paths)
+ partition_paths = get_partition_paths(0, partitions, base_paths)
for path in partition_paths:
partition_sizes.append(0)
# Make sure the xml folder is available.
@@ -319,24 +319,25 @@ class WeatherDataFiles:
# Index values of each field details.
-PARTITION_INDEX_DISK = 0
-PARTITION_INDEX_VIRTUAL = 1
-PARTITION_INDEX = 2
-PARTITION_INDEX_PATH = 3
-PARTITION_HEADER = ("Disk", "Virtual", "Index", "Path")
+PARTITION_INDEX_NODE = 0
+PARTITION_INDEX_DISK = 1
+PARTITION_INDEX_VIRTUAL = 2
+PARTITION_INDEX = 3
+PARTITION_INDEX_PATH = 4
+PARTITION_HEADER = ("Node", "Disk", "Virtual", "Index", "Path")
-def get_partition_paths(partitions, base_paths, key="partitions"):
+def get_partition_paths(node_id, partitions, base_paths, key="partitions"):
partition_paths = []
- for scheme in get_partition_scheme(partitions, base_paths, key):
+ for scheme in get_partition_scheme(node_id, partitions, base_paths, key):
partition_paths.append(scheme[PARTITION_INDEX_PATH])
return partition_paths
-def get_partition_scheme(partitions, base_paths, key="partitions"):
+def get_partition_scheme(node_id, partitions, base_paths, key="partitions"):
partition_scheme = []
for i in range(0, partitions):
for j in range(0, len(base_paths)):
new_partition_path = base_paths[j] + key + "/" + get_partition_folder(j, partitions, i) + "/"
- partition_scheme.append((j, partitions, i, new_partition_path))
+ partition_scheme.append((node_id, j, partitions, i, new_partition_path))
return partition_scheme
def get_partition_folder(disks, partitions, index):