You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@vxquery.apache.org by pr...@apache.org on 2014/04/02 02:21:21 UTC
[02/16] git commit: Updated the method to partition the weather data.
Updated the method to partition the weather data.
The new method is per xml file using a round robin method.
Project: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/commit/ec0e5573
Tree: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/tree/ec0e5573
Diff: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/diff/ec0e5573
Branch: refs/heads/prestonc/hash_join
Commit: ec0e5573d14a0efe6afa5d7415f3298aa54608ea
Parents: bb1ec57
Author: Preston Carman <pr...@apache.org>
Authored: Tue Mar 25 16:39:22 2014 -0700
Committer: Preston Carman <pr...@apache.org>
Committed: Tue Mar 25 16:39:22 2014 -0700
----------------------------------------------------------------------
.../scripts/weather_benchmark.py | 2 +-
.../scripts/weather_data_files.py | 54 ++++++++++++++++++++
2 files changed, 55 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/ec0e5573/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py
----------------------------------------------------------------------
diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py
index 2077c10..be95ef8 100644
--- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py
+++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py
@@ -140,7 +140,7 @@ class WeatherBenchmark:
print "Unknown test."
return
group_size = group_size / link_virtual
- node_offset = group_size * (node_index * local_virtual_partitions)
+ node_offset = group_size * (node_index * partition)
node_offset += group_size * link_index
has_data = True
if link_node < node_index:
http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/ec0e5573/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py
----------------------------------------------------------------------
diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py
index 64e19d6..a7fb691 100644
--- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py
+++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py
@@ -105,6 +105,60 @@ class WeatherDataFiles:
# Make sure the xml folder is available.
prepare_path(path, reset)
+ import fnmatch
+ import os
+
+ # copy stations and sensors into each partition
+ current_sensor_partition = 0
+ current_station_partition = 0
+ self.open_progress_data()
+ row_count = len(self.progress_data)
+ for row in range(0, row_count):
+ row_contents = self.progress_data[row].rsplit(self.SEPERATOR)
+ file_name = row_contents[self.INDEX_DATA_FILE_NAME]
+ station_id = os.path.basename(file_name).split('.')[0]
+
+ # Copy sensor files
+ type = "sensors"
+ file_path = build_base_save_folder(save_path, station_id, type) + station_id
+ for root, dirnames, filenames in os.walk(file_path):
+ for filename in fnmatch.filter(filenames, '*.xml'):
+ xml_path = os.path.join(root, filename)
+ new_file_base = build_base_save_folder(partition_paths[current_sensor_partition], station_id, type) + station_id
+ if not os.path.isdir(new_file_base):
+ os.makedirs(new_file_base)
+ shutil.copyfile(xml_path, new_file_base + "/" + filename)
+ current_sensor_partition += 1
+ if current_sensor_partition >= len(partition_paths):
+ current_sensor_partition = 0
+
+ # Copy station files
+ type = "stations"
+ file_path = build_base_save_folder(save_path, station_id, type) + station_id + ".xml"
+ new_file_base = build_base_save_folder(partition_paths[current_station_partition], station_id, type)
+ new_file_path = new_file_base + station_id + ".xml"
+ if os.path.isfile(file_path):
+ if not os.path.isdir(new_file_base):
+ os.makedirs(new_file_base)
+ shutil.copyfile(file_path, new_file_path)
+ current_station_partition += 1
+ if current_station_partition >= len(partition_paths):
+ current_station_partition = 0
+
+
+ def copy_to_n_partitions_by_station(self, save_path, partitions, base_paths, reset):
+ """Once the initial data has been generated, the data can be copied into a set number of partitions. """
+ if (len(base_paths) == 0):
+ return
+
+ # Initialize the partition paths.
+ partition_sizes = []
+ partition_paths = get_partition_paths(0, partitions, base_paths)
+ for path in partition_paths:
+ partition_sizes.append(0)
+ # Make sure the xml folder is available.
+ prepare_path(path, reset)
+
# copy stations and sensors into each partition
current_partition = 0
csv_sorted = self.get_csv_in_partition_order()