You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@vxquery.apache.org by pr...@apache.org on 2014/04/02 06:12:10 UTC

[30/50] [abbrv] git commit: Updated the method to partition the weather data.

Updated the method to partition the weather data.

The new method is per xml file using a round robin method.


Project: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/commit/ef2f36ac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/tree/ef2f36ac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/diff/ef2f36ac

Branch: refs/heads/prestonc/hash_join
Commit: ef2f36ac38d5fd7951ec36aa2707a71e6ed6a6b4
Parents: f60f885
Author: Preston Carman <pr...@apache.org>
Authored: Tue Mar 25 16:39:22 2014 -0700
Committer: Preston Carman <pr...@apache.org>
Committed: Tue Apr 1 20:56:25 2014 -0700

----------------------------------------------------------------------
 .../scripts/weather_benchmark.py                |  2 +-
 .../scripts/weather_data_files.py               | 54 ++++++++++++++++++++
 2 files changed, 55 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/ef2f36ac/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py
----------------------------------------------------------------------
diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py
index 2077c10..be95ef8 100644
--- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py
+++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py
@@ -140,7 +140,7 @@ class WeatherBenchmark:
                 print "Unknown test."
                 return
             group_size = group_size / link_virtual
-            node_offset = group_size * (node_index * local_virtual_partitions)
+            node_offset = group_size * (node_index * partition)
             node_offset += group_size * link_index
             has_data = True
             if link_node < node_index:

http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/ef2f36ac/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py
----------------------------------------------------------------------
diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py
index 64e19d6..a7fb691 100644
--- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py
+++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py
@@ -105,6 +105,60 @@ class WeatherDataFiles:
             # Make sure the xml folder is available.
             prepare_path(path, reset)
 
+        import fnmatch
+        import os
+        
+        # copy stations and sensors into each partition
+        current_sensor_partition = 0
+        current_station_partition = 0
+        self.open_progress_data()
+        row_count = len(self.progress_data)
+        for row in range(0, row_count):
+            row_contents = self.progress_data[row].rsplit(self.SEPERATOR)
+            file_name = row_contents[self.INDEX_DATA_FILE_NAME]
+            station_id = os.path.basename(file_name).split('.')[0]
+               
+            # Copy sensor files
+            type = "sensors"
+            file_path = build_base_save_folder(save_path, station_id, type) + station_id
+            for root, dirnames, filenames in os.walk(file_path):
+                for filename in fnmatch.filter(filenames, '*.xml'):
+                    xml_path = os.path.join(root, filename)
+                    new_file_base = build_base_save_folder(partition_paths[current_sensor_partition], station_id, type) + station_id
+                    if not os.path.isdir(new_file_base):
+                        os.makedirs(new_file_base)
+                    shutil.copyfile(xml_path, new_file_base + "/" + filename)
+                    current_sensor_partition += 1
+                    if current_sensor_partition >= len(partition_paths):
+                        current_sensor_partition = 0
+            
+            # Copy station files
+            type = "stations"
+            file_path = build_base_save_folder(save_path, station_id, type) + station_id + ".xml"
+            new_file_base = build_base_save_folder(partition_paths[current_station_partition], station_id, type)
+            new_file_path = new_file_base + station_id + ".xml"
+            if os.path.isfile(file_path):
+                if not os.path.isdir(new_file_base):
+                    os.makedirs(new_file_base)
+                shutil.copyfile(file_path, new_file_path)
+            current_station_partition += 1
+            if current_station_partition >= len(partition_paths):
+                current_station_partition = 0
+
+    
+    def copy_to_n_partitions_by_station(self, save_path, partitions, base_paths, reset):
+        """Once the initial data has been generated, the data can be copied into a set number of partitions. """
+        if (len(base_paths) == 0):
+            return
+        
+        # Initialize the partition paths.
+        partition_sizes = []
+        partition_paths = get_partition_paths(0, partitions, base_paths)
+        for path in partition_paths:
+            partition_sizes.append(0)
+            # Make sure the xml folder is available.
+            prepare_path(path, reset)
+
         # copy stations and sensors into each partition
         current_partition = 0
         csv_sorted = self.get_csv_in_partition_order()