You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@vxquery.apache.org by pr...@apache.org on 2014/04/02 06:11:43 UTC

[03/50] [abbrv] git commit: Adding options to build out symbolic links to partitioned data for specific benchmark tests. Also have a method to copy queries over and change the collection references to the symbolic links.

Adding options to build out symbolic links to partitioned data for specific benchmark tests. Also have a method to copy queries over and change the collection references to the symbolic links.


Project: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/commit/b43fe330
Tree: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/tree/b43fe330
Diff: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/diff/b43fe330

Branch: refs/heads/prestonc/hash_join
Commit: b43fe330fdbda9701374e786e2c82f85a3898065
Parents: a09d1f5
Author: Preston Carman <pr...@apache.org>
Authored: Sun Feb 23 17:38:13 2014 -0800
Committer: Preston Carman <pr...@apache.org>
Committed: Tue Apr 1 20:56:23 2014 -0700

----------------------------------------------------------------------
 .../scripts/weather_benchmark.py                | 269 +++++++++++++++++++
 .../noaa-ghcn-daily/scripts/weather_cli.py      |  28 +-
 .../noaa-ghcn-daily/scripts/weather_config.py   |  23 +-
 .../scripts/weather_data_files.py               |   7 +-
 .../noaa-ghcn-daily/scripts/weather_example.xml |   2 +
 .../scripts/weather_example_cluster.xml         |  56 ++++
 6 files changed, 364 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/b43fe330/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py
----------------------------------------------------------------------
diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py
new file mode 100644
index 0000000..68c93b3
--- /dev/null
+++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py
@@ -0,0 +1,269 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import glob
+import os.path
+import linecache
+import distutils.core
+import fileinput
+import socket
+
+from weather_config import *
+from weather_data_files import *
+from collections import OrderedDict
+
+# Weather data files created to manage the conversion process.
+# Allows partition and picking up where you left off.
+#
+# benchmark_name/
+#   data/
+#   queries/
+#   logs/
+class WeatherBenchmark:
+
+    QUERY_REPLACEMENT_KEY = "/tmp/1.0_partition_ghcnd_all_xml/"
+    QUERY_MASTER_FOLDER = "../queries/"
+    QUERY_FILE_LIST = ["q00.xq", "q01.xq", "q02.xq", "q03.xq", "q04.xq", "q05.xq"] 
+    BENCHMARK_LOCAL_TESTS = ["local_speed_up"] 
+    BENCHMARK_CLUSTER_TESTS = ["speed_up", "batch_scale_out"] 
+    QUERY_COLLECTIONS = ["sensors", "stations"]
+
+    SEPERATOR = "|"
+    
+    def __init__(self, base_paths, partitions, dataset, nodes):
+        self.base_paths = base_paths
+        self.partitions = partitions
+        self.dataset = dataset
+        self.nodes = nodes
+        
+    def build_data_links(self, xml_save_path):
+        if (len(self.base_paths) == 0):
+            return
+        for test in self.dataset.get_tests():
+            if test in self.BENCHMARK_LOCAL_TESTS:
+                self.build_local_data_links(test, xml_save_path)
+            elif test in self.BENCHMARK_CLUSTER_TESTS:
+                self.build_cluster_data_links(test, xml_save_path)
+            else:
+                print "Unknown test."
+                exit()
+            
+    def build_local_data_links(self, test, xml_save_path):
+        virtual_partitions = get_local_virtual_partitions(self.partitions)
+        data_paths = get_partition_paths(virtual_partitions, self.base_paths)
+        for i in self.partitions:
+            link_base_paths = get_partition_paths(i, self.base_paths, "data_links/" + test)
+            # Match link paths to real data paths.
+            offset = 0
+            group_size = len(data_paths) / len(link_base_paths)
+            print "g " + str(group_size)
+            print link_base_paths
+            print data_paths
+            for link_index, link_path in enumerate(link_base_paths):
+                for data_index, data_path in  enumerate(data_paths):
+                    print index, offset, group_size, link_index, data_index
+                    if offset <= data_index and data_index < offset + group_size:
+                        self.add_collection_links_for(data_path, link_path, data_index)
+                offset += group_size
+    
+    def build_cluster_data_links(self, test, xml_save_path):
+        node_index = self.get_current_node_index()
+        if node_index == -1:
+            print "Unknown host."
+            return 
+        
+        virtual_partitions = get_cluster_virtual_partitions(self.nodes, self.partitions)
+        data_paths = get_partition_paths(virtual_partitions, self.base_paths)
+        link_base_paths = get_cluster_link_paths(len(self.nodes), self.base_paths, "data_links/" + test)
+
+        # Match link paths to real data paths.
+        link_base_paths.sort()
+        for link_index, link_path in enumerate(link_base_paths):
+            if os.path.isdir(link_path):
+                shutil.rmtree(link_path)
+            # Prep
+            link_offset = link_index % len(self.nodes)
+            disk_offset = link_index // len(self.nodes)
+            if test == "speed_up":
+                group_size = len(data_paths) / (link_offset + 1) / (len(self.base_paths))
+            elif test == "batch_scale_out":
+                group_size = len(data_paths) / len(self.nodes) / (len(self.base_paths))
+            else:
+                print "Unknown test."
+                return
+            node_offset = group_size * node_index
+            for j in range(disk_offset):
+                node_offset += len(data_paths) / (len(self.base_paths))
+            has_data = True
+            if link_offset < node_index:
+                has_data = False
+                    
+            # Make links
+            data_paths.sort()
+            for data_index, data_path in enumerate(data_paths):
+                if has_data and node_offset <= data_index and data_index < node_offset + group_size:
+                    self.add_collection_links_for(data_path, link_path, data_index)
+            self.add_collection_links_for("", link_path, -1)
+    
+    def get_current_node_index(self):
+        found = False
+        node_index = 0
+        for machine in self.nodes:
+            if socket.gethostname() == machine.get_node_name():
+                found = True
+                break
+            node_index += 1
+    
+        if found:
+            return node_index
+        else:
+            return -1
+    
+    def add_collection_links_for(self, real_path, link_path, index):
+        for collection in self.QUERY_COLLECTIONS:
+            collection_path = link_path + "/" + collection
+            if not os.path.isdir(collection_path):
+                os.makedirs(collection_path)
+            if index >= 0:
+                os.symlink(real_path + "/" + collection, collection_path + "/index" + str(index))
+            
+    def get_partition_folders(self, base_path):
+        glob.glob(base_path + "partitions/d*_p*_i*")
+            
+#         test_data_path = self.base_path + "/" + self.test + "/data"
+#         if not os.path.isdir(test_data_path):
+#             os.makedirs(test_data_path)
+#     
+#         if self.test == "local_speed_up":
+#             for i in range(virtual_partitions):
+#                 # one virtual partition per disk
+#                 split = 0
+#                 for j in range(len(base_paths)):
+#                     # for each disk look at each partition
+#                     for index, path in enumerate(partition_list):
+#                         offset = partitions * j
+#                         group = partitions / (i + 1)
+#                         
+#                         if (group) * split + offset <= index and index < (group) * (split + 1) + offset:
+#                             split += 1
+#                         
+#                         test_partition_path = test_data_path + "/p" + str(i + 1) + ".i" + str(split) + ".d" + str(j + 1)
+#                         if not os.path.isdir(test_partition_path):
+#                             os.makedirs(test_partition_path)
+#                         os.symlink(path, test_partition_path + "/index" + str(index))
+        
+
+    def copy_query_files(self):
+        for test in self.dataset.get_tests():
+            if test in self.BENCHMARK_LOCAL_TESTS:
+                self.copy_local_query_files(test)
+            elif test in self.BENCHMARK_CLUSTER_TESTS:
+                self.copy_cluster_query_files(test)
+            else:
+                print "Unknown test."
+                exit()
+            
+    def copy_cluster_query_files(self, test):
+        partitions = self.dataset.get_partitions()[0]
+        for i in range(len(self.nodes)):
+            query_path = get_cluster_query_path(self.base_paths, test, i)
+        
+            if not os.path.isdir(query_path):
+                os.makedirs(query_path)
+        
+            # Copy query files.
+            node_partitions = get_partition_paths(partitions, self.base_paths, "data_links/" + test + "/" + str(i) + "nodes")
+            self.copy_and_replace_query(query_path, node_partitions)
+
+    def copy_local_query_files(self, test):
+        for i in self.partitions:
+            query_path = get_local_query_path(self.base_paths, test, i)
+        
+            if not os.path.isdir(query_path):
+                os.makedirs(query_path)
+    
+            # Copy query files.
+            self.copy_and_replace_query(query_path, get_partition_paths(i, self.base_paths))
+
+    def copy_and_replace_query(self, query_path, replacement_list):
+        '''Copy the query files over to the query_path and replace the path
+        for the where the collection data is located.'''
+        for query_file in self.QUERY_FILE_LIST:
+            shutil.copyfile(self.QUERY_MASTER_FOLDER + query_file, query_path + query_file)
+        
+            # Make a search replace for each collection.
+            for collection in self.QUERY_COLLECTIONS:
+                replacement_list_with_type = []
+                for replace in replacement_list:
+                    replacement_list_with_type.append(replace + collection)
+
+                replace_string = self.SEPERATOR.join(replacement_list_with_type)
+                for line in fileinput.input(query_path + query_file, True):
+                    sys.stdout.write(line.replace(self.QUERY_REPLACEMENT_KEY + collection, replace_string))
+                    
+    def get_number_of_slices(self):
+        print self.dataset
+        if len(self.dataset.get_tests()) == 0:
+            print "No test has been defined in config file."
+        else:
+            for test in self.dataset.get_tests():
+                print "test = " + test
+                if test in self.BENCHMARK_LOCAL_TESTS:
+                    return get_local_virtual_partitions(self.partitions)
+                elif test in self.BENCHMARK_CLUSTER_TESTS:
+                    return get_cluster_virtual_partitions(self.nodes, self.partitions)
+                else:
+                    print "Unknown test."
+                    exit()
+
+
+def get_cluster_link_paths(nodes, base_paths, key="partitions"):        
+    link_paths = []
+    for i in range(0, nodes):
+        for j in range(0, len(base_paths)):
+            new_link_path = base_paths[j] + key + "/" + str(i) + "nodes/"
+            link_paths.append(new_link_path)
+    return link_paths
+
+
+
+def get_local_query_path(base_paths, test, partition):        
+    return base_paths[0] + "queries/" + test + "/" + get_local_query_folder(len(base_paths), partition) + "/"
+
+def get_local_query_folder(disks, partitions):        
+    return "d" + str(disks) + "_p" + str(partitions)
+
+def get_cluster_query_path(base_paths, test, nodes):        
+    return base_paths[0] + "queries/" + test + "/" + str(nodes) + "nodes/"
+
+def get_cluster_virtual_partitions(nodes, partitions):
+    if len(partitions) != 1:
+        print "Cluster configurations must only have one partition."
+        exit()
+    return calculate_partitions(range(len(nodes), 0, -1))
+
+def get_local_virtual_partitions(partitions):
+    return calculate_partitions(partitions)
+
+def calculate_partitions(list):
+    x = 1
+    for i in list:
+        if x % i != 0:
+            if i % x == 0:
+                x = i
+            else:
+                x *= i
+    return x

http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/b43fe330/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py
----------------------------------------------------------------------
diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py
index c69133e..92145a2 100644
--- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py
+++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py
@@ -21,6 +21,7 @@ from weather_data_files import *
 from weather_download_files import *
 from weather_convert_to_xml import *
 from weather_config import *
+from weather_benchmark import *
 
 DEBUG_OUTPUT = False
 COMPRESSED = False
@@ -78,7 +79,7 @@ def main(argv):
                 print 'Error: Argument must be a file name for --file (-f).'
                 sys.exit()
         elif opt in ('-l', "--locality"):
-            if arg in ("download", "progress_file", "sensor_build", "station_build", "partition", "test_links", "statistics"):
+            if arg in ("download", "progress_file", "sensor_build", "station_build", "partition", "test_links",  "queries", "statistics"):
                 section = arg
             else:
                 print 'Error: Argument must be a string for --locality (-l) and a valid locality.'
@@ -205,26 +206,27 @@ def main(argv):
         dataset_folder = "/dataset-" + dataset.get_name()
         progress_file = config.get_save_path() + dataset_folder + "/_data_progress.csv"
         data = WeatherDataFiles(ghcnd_data_dly_path, progress_file)
+
         base_paths = []
         for paths in dataset.get_save_paths():
             base_paths.append(paths + dataset_folder + "/")
-
+        benchmark = WeatherBenchmark(base_paths, dataset.get_partitions(), dataset, config.get_node_machine_list())
+        
         if section in ("all", "partition"):
-            for partition in dataset.get_partitions():
-                print 'Processing the partition section (' + dataset.get_name() + ':d' + str(len(base_paths)) + ':p' + str(partition) + ').'
-                data.reset()
-                data.copy_to_n_partitions(xml_data_save_path, partition, base_paths)
+            slices = benchmark.get_number_of_slices()
+            print 'Processing the partition section (' + dataset.get_name() + ':d' + str(len(base_paths)) + ':s' + str(slices) + ').'
+            data.reset()
+            data.copy_to_n_partitions(xml_data_save_path, slices, base_paths)
     
         if section in ("all", "test_links"):
             # TODO determine current node 
-            if test and partitions > 0 and virtual_partitions > 0 and nodes > -1:
-                print 'Processing the test links section.'
-                data.reset()
-                data.create_test_links(config.get_save_path(), xml_data_save_path, test, nodes, partitions, virtual_partitions, base_paths)
-            else:
-                print 'Error: Not enough information for this section.'
-                sys.exit()
+            print 'Processing the test links section (' + dataset.get_name() + ').'
+            benchmark.build_data_links(xml_data_save_path)
 
+        if section in ("all", "queries"):
+            print 'Processing the queries section (' + dataset.get_name() + ').'
+            benchmark.copy_query_files()
+    
 #     if section in ("statistics"):
 #         print 'Processing the statistics section.'
 #         data.print_progress_file_stats(convert)

http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/b43fe330/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_config.py
----------------------------------------------------------------------
diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_config.py b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_config.py
index fa12342..9d2e289 100644
--- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_config.py
+++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_config.py
@@ -42,7 +42,8 @@ class WeatherConfig:
             name = self.get_dataset_name(node)
             save_paths = self.get_dataset_save_paths(node)
             partitions = self.get_dataset_partitions(node)
-            nodes.append(Dataset(name, save_paths, partitions))
+            tests = self.get_dataset_tests(node)
+            nodes.append(Dataset(name, save_paths, partitions, tests))
         return nodes
 
 
@@ -64,16 +65,22 @@ class WeatherConfig:
 
     def get_dataset_save_paths(self, node):
         paths = []
-        for node in node.getElementsByTagName("save_path"):
-            paths.append(self.get_text(node))
+        for item in node.getElementsByTagName("save_path"):
+            paths.append(self.get_text(item))
         return paths
 
     def get_dataset_partitions(self, node):
         paths = []
-        for node in node.getElementsByTagName("partitions_per_path"):
-            paths.append(int(self.get_text(node)))
+        for item in node.getElementsByTagName("partitions_per_path"):
+            paths.append(int(self.get_text(item)))
         return paths
 
+    def get_dataset_tests(self, node):
+        tests = []
+        for item in node.getElementsByTagName("test"):
+            tests.append(self.get_text(item))
+        return tests
+
     def get_text(self, xml_node):
         rc = []
         for node in xml_node.childNodes:
@@ -96,10 +103,11 @@ class Machine:
         return self.id + "(" + self.ip + ")"
     
 class Dataset:
-    def __init__(self, name, save_paths, partitions):
+    def __init__(self, name, save_paths, partitions, tests):
         self.name = name
         self.save_paths = save_paths
         self.partitions = partitions
+        self.tests = tests
     
     def get_name(self):
         return self.name
@@ -110,6 +118,9 @@ class Dataset:
     def get_partitions(self):
         return self.partitions
     
+    def get_tests(self):
+        return self.tests
+    
     def __repr__(self):
         return self.name + ":" + str(self.save_paths) + ":" + str(self.partitions)
     

http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/b43fe330/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py
----------------------------------------------------------------------
diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py
index be85c69..42dea81 100644
--- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py
+++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py
@@ -347,12 +347,15 @@ class WeatherDataFiles:
                 break
         return columns[self.INDEX_DATA_FILE_NAME]
     
-def get_partition_paths(partitions, base_paths):        
+def get_partition_paths(partitions, base_paths, key = "partitions"):        
     partition_paths = []
     for i in range(0, partitions):
         for j in range(0, len(base_paths)):
-            new_partition_path = base_paths[j] + "partitions/" + str(len(base_paths)) + "disk/d" + str(j) +"_p" + str(partitions) + "_i" + str(i) + "/"
+            new_partition_path = base_paths[j] + key + "/" + get_partition_folder(j, partitions, i) + "/"
             partition_paths.append(new_partition_path)
     return partition_paths
 
+def get_partition_folder(disks, partitions, index):        
+    return "d" + str(disks) +"_p" + str(partitions) + "_i" + str(index)
+
 

http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/b43fe330/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_example.xml
----------------------------------------------------------------------
diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_example.xml b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_example.xml
index b8e60fd..7af1e9d 100644
--- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_example.xml
+++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_example.xml
@@ -24,6 +24,7 @@
     </node>
     <dataset>
         <name>tiny</name>
+        <test>local_speed_up</test>
         <save_path>/data</save_path>
         <partitions_per_path>1</partitions_per_path>
         <partitions_per_path>2</partitions_per_path>
@@ -32,6 +33,7 @@
     </dataset>
     <dataset>
         <name>small</name>
+        <test>local_speed_up</test>
         <save_path>/data</save_path>
         <partitions_per_path>4</partitions_per_path>
         <partitions_per_path>8</partitions_per_path>

http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/b43fe330/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_example_cluster.xml
----------------------------------------------------------------------
diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_example_cluster.xml b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_example_cluster.xml
new file mode 100644
index 0000000..34be0df
--- /dev/null
+++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_example_cluster.xml
@@ -0,0 +1,56 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<data xmlns="data">
+    <name>Cluster Example</name>
+    <save_path>/data</save_path>
+    <package>all</package>
+    <node>
+        <name>machine1</name>
+        <ip_address>127.0.0.1</ip_address>
+    </node>
+    <node>
+        <name>machine2</name>
+        <ip_address>127.0.0.2</ip_address>
+    </node>
+    <node>
+        <name>machine3</name>
+        <ip_address>127.0.0.3</ip_address>
+    </node>
+    <node>
+        <name>machine4</name>
+        <ip_address>127.0.0.4</ip_address>
+    </node>
+    <node>
+        <name>machine5</name>
+        <ip_address>127.0.0.5</ip_address>
+    </node>
+    <dataset>
+        <name>tiny-1drive</name>
+        <test>speed_up</test>
+        <test>batch_scale_out</test>
+        <save_path>/data</save_path>
+        <partitions_per_path>4</partitions_per_path>
+    </dataset>
+    <dataset>
+        <name>small-2drives</name>
+        <test>speed_up</test>
+        <test>batch_scale_out</test>
+        <save_path>/data</save_path>
+        <save_path>/data2</save_path>
+        <partitions_per_path>2</partitions_per_path>
+    </dataset>
+</data>