You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/12/15 22:38:39 UTC

[GitHub] [arrow] westonpace commented on a change in pull request #11911: ARROW-15019: [Python] Add bindings for new dataset writing options

westonpace commented on a change in pull request #11911:
URL: https://github.com/apache/arrow/pull/11911#discussion_r769920049



##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -3621,6 +3621,204 @@ def compare_tables_ignoring_order(t1, t2):
     assert not extra_file.exists()
 
 
+def _generate_random_int_array(size=4, min=1, max=10):
+    return np.random.randint(min, max, size)
+
+
+def _generate_data_and_columns(num_of_columns, records_per_row,
+                               unique_records=None):
+    data = []
+    column_names = []
+    if unique_records is None:
+        unique_records = records_per_row
+    for i in range(num_of_columns):
+        data.append(_generate_random_int_array(size=records_per_row,
+                                               min=1,
+                                               max=unique_records))
+        column_names.append("c" + str(i))
+    return data, column_names
+
+
+def _get_num_of_files_generated(base_directory):

Review comment:
       Maybe...
   ```
   files = list(pathlib.Path(base_directory).glob('**/*.parquet'))
   return len(files)
   ```

##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -3621,6 +3621,204 @@ def compare_tables_ignoring_order(t1, t2):
     assert not extra_file.exists()
 
 
+def _generate_random_int_array(size=4, min=1, max=10):
+    return np.random.randint(min, max, size)
+
+
+def _generate_data_and_columns(num_of_columns, records_per_row,
+                               unique_records=None):
+    data = []
+    column_names = []
+    if unique_records is None:
+        unique_records = records_per_row
+    for i in range(num_of_columns):
+        data.append(_generate_random_int_array(size=records_per_row,
+                                               min=1,
+                                               max=unique_records))
+        column_names.append("c" + str(i))
+    return data, column_names
+
+
+def _get_num_of_files_generated(base_directory):
+    file_dirs = os.listdir(base_directory)
+    number_of_files = 0
+    for _, file_dir in enumerate(file_dirs):
+        sub_dir_path = base_directory / file_dir
+        number_of_files += len(os.listdir(sub_dir_path))
+    return number_of_files
+
+
+def _get_compare_pair(data_source, record_batch):

Review comment:
       Nit: It's not obvious to me from the name what `_get_compare_pair` does.  Since it is only used in one test right now maybe move the definition of this function inside the test.

##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -3621,6 +3621,204 @@ def compare_tables_ignoring_order(t1, t2):
     assert not extra_file.exists()
 
 
+def _generate_random_int_array(size=4, min=1, max=10):
+    return np.random.randint(min, max, size)
+
+
+def _generate_data_and_columns(num_of_columns, records_per_row,
+                               unique_records=None):
+    data = []
+    column_names = []
+    if unique_records is None:
+        unique_records = records_per_row
+    for i in range(num_of_columns):
+        data.append(_generate_random_int_array(size=records_per_row,
+                                               min=1,
+                                               max=unique_records))
+        column_names.append("c" + str(i))
+    return data, column_names
+
+
+def _get_num_of_files_generated(base_directory):
+    file_dirs = os.listdir(base_directory)
+    number_of_files = 0
+    for _, file_dir in enumerate(file_dirs):
+        sub_dir_path = base_directory / file_dir
+        number_of_files += len(os.listdir(sub_dir_path))
+    return number_of_files
+
+
+def _get_compare_pair(data_source, record_batch):
+    num_of_files_generated = _get_num_of_files_generated(
+        base_directory=data_source)
+    number_of_unique_rows = len(pa.compute.unique(record_batch[0]))
+    return num_of_files_generated, number_of_unique_rows
+
+
+def test_write_dataset_max_rows_per_file(tempdir):
+    directory = tempdir / 'ds'
+    max_rows_per_file = 10
+    max_rows_per_group = 10
+    num_of_columns = 2
+    records_per_row = 35
+
+    data, column_names = _generate_data_and_columns(num_of_columns,
+                                                    records_per_row)
+
+    record_batch = pa.record_batch(data=data, names=column_names)
+
+    sub_directory = directory / 'onewrite'
+
+    ds.write_dataset(record_batch, sub_directory, format="parquet",
+                     max_rows_per_file=max_rows_per_file,
+                     max_rows_per_group=max_rows_per_group)
+
+    files_in_dir = os.listdir(sub_directory)
+
+    # number of partitions with max_rows and the partition with the remainder
+    expected_partitions = len(data[0]) // max_rows_per_file + 1
+    expected_row_combination = [max_rows_per_file
+                                for i in range(expected_partitions - 1)] \
+        + [len(data[0]) - ((expected_partitions - 1) * max_rows_per_file)]
+
+    # test whether the expected amount of files are written
+    assert len(files_in_dir) == expected_partitions
+
+    # compute the number of rows per each file written
+    result_row_combination = []
+    for _, f_file in enumerate(files_in_dir):
+        f_path = sub_directory / str(f_file)
+        dataset = ds.dataset(f_path, format="parquet")
+        result_row_combination.append(dataset.to_table().shape[0])
+
+    # test whether the generated files have the expected number of rows
+    assert len(expected_row_combination) == len(result_row_combination)
+    assert sum(expected_row_combination) == sum(result_row_combination)
+
+
+def test_write_dataset_min_rows_per_group(tempdir):
+    directory = tempdir / 'ds'
+    min_rows_per_group = 10
+    max_rows_per_group = 20
+    num_of_columns = 2
+    records_per_row = 49
+    unique_records = 5

Review comment:
       I think a better test for `min_rows_per_group` would be writing multiple batches where `records_per_row` is less than `min_rows_per_group`.  For example, if `min_rows_per_group = 10` and `max_rows_per_group = 20` and `records_per_row = 4` then:
   
   Writing 2 batches -> 1 file 1 row group
   Writing 3 batches -> 1 file 2 row groups
   

##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -3621,6 +3621,204 @@ def compare_tables_ignoring_order(t1, t2):
     assert not extra_file.exists()
 
 
+def _generate_random_int_array(size=4, min=1, max=10):
+    return np.random.randint(min, max, size)
+
+
+def _generate_data_and_columns(num_of_columns, records_per_row,

Review comment:
       Nit: `records_per_row` is a confusing name.  It's maybe `records_per_column` but perhaps just `num_rows`?

##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -3621,6 +3621,204 @@ def compare_tables_ignoring_order(t1, t2):
     assert not extra_file.exists()
 
 
+def _generate_random_int_array(size=4, min=1, max=10):
+    return np.random.randint(min, max, size)
+
+
+def _generate_data_and_columns(num_of_columns, records_per_row,
+                               unique_records=None):
+    data = []
+    column_names = []
+    if unique_records is None:
+        unique_records = records_per_row
+    for i in range(num_of_columns):
+        data.append(_generate_random_int_array(size=records_per_row,
+                                               min=1,
+                                               max=unique_records))
+        column_names.append("c" + str(i))
+    return data, column_names
+
+
+def _get_num_of_files_generated(base_directory):
+    file_dirs = os.listdir(base_directory)
+    number_of_files = 0
+    for _, file_dir in enumerate(file_dirs):
+        sub_dir_path = base_directory / file_dir
+        number_of_files += len(os.listdir(sub_dir_path))
+    return number_of_files
+
+
+def _get_compare_pair(data_source, record_batch):
+    num_of_files_generated = _get_num_of_files_generated(
+        base_directory=data_source)
+    number_of_unique_rows = len(pa.compute.unique(record_batch[0]))
+    return num_of_files_generated, number_of_unique_rows
+
+
+def test_write_dataset_max_rows_per_file(tempdir):
+    directory = tempdir / 'ds'
+    max_rows_per_file = 10
+    max_rows_per_group = 10
+    num_of_columns = 2
+    records_per_row = 35
+
+    data, column_names = _generate_data_and_columns(num_of_columns,
+                                                    records_per_row)
+
+    record_batch = pa.record_batch(data=data, names=column_names)
+
+    sub_directory = directory / 'onewrite'
+
+    ds.write_dataset(record_batch, sub_directory, format="parquet",
+                     max_rows_per_file=max_rows_per_file,
+                     max_rows_per_group=max_rows_per_group)
+
+    files_in_dir = os.listdir(sub_directory)
+
+    # number of partitions with max_rows and the partition with the remainder
+    expected_partitions = len(data[0]) // max_rows_per_file + 1

Review comment:
       Instead of `len(data[0])` you could reuse `records_per_row` (although num_rows as mentioned elsewhere) and it would be a bit clearer.

##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -3621,6 +3621,204 @@ def compare_tables_ignoring_order(t1, t2):
     assert not extra_file.exists()
 
 
+def _generate_random_int_array(size=4, min=1, max=10):
+    return np.random.randint(min, max, size)
+
+
+def _generate_data_and_columns(num_of_columns, records_per_row,
+                               unique_records=None):
+    data = []
+    column_names = []
+    if unique_records is None:
+        unique_records = records_per_row
+    for i in range(num_of_columns):
+        data.append(_generate_random_int_array(size=records_per_row,
+                                               min=1,
+                                               max=unique_records))
+        column_names.append("c" + str(i))
+    return data, column_names
+
+
+def _get_num_of_files_generated(base_directory):
+    file_dirs = os.listdir(base_directory)
+    number_of_files = 0
+    for _, file_dir in enumerate(file_dirs):
+        sub_dir_path = base_directory / file_dir
+        number_of_files += len(os.listdir(sub_dir_path))
+    return number_of_files
+
+
+def _get_compare_pair(data_source, record_batch):
+    num_of_files_generated = _get_num_of_files_generated(
+        base_directory=data_source)
+    number_of_unique_rows = len(pa.compute.unique(record_batch[0]))
+    return num_of_files_generated, number_of_unique_rows
+
+
+def test_write_dataset_max_rows_per_file(tempdir):
+    directory = tempdir / 'ds'
+    max_rows_per_file = 10
+    max_rows_per_group = 10
+    num_of_columns = 2
+    records_per_row = 35
+
+    data, column_names = _generate_data_and_columns(num_of_columns,
+                                                    records_per_row)
+
+    record_batch = pa.record_batch(data=data, names=column_names)
+
+    sub_directory = directory / 'onewrite'

Review comment:
       Nit: You could just use `directory` (or even `tempdir`).  Each test will get its own temporary directory so these subdirs aren't really required.

##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -3621,6 +3621,204 @@ def compare_tables_ignoring_order(t1, t2):
     assert not extra_file.exists()
 
 
+def _generate_random_int_array(size=4, min=1, max=10):
+    return np.random.randint(min, max, size)
+
+
+def _generate_data_and_columns(num_of_columns, records_per_row,
+                               unique_records=None):
+    data = []
+    column_names = []
+    if unique_records is None:
+        unique_records = records_per_row
+    for i in range(num_of_columns):
+        data.append(_generate_random_int_array(size=records_per_row,
+                                               min=1,
+                                               max=unique_records))
+        column_names.append("c" + str(i))
+    return data, column_names
+
+
+def _get_num_of_files_generated(base_directory):
+    file_dirs = os.listdir(base_directory)
+    number_of_files = 0
+    for _, file_dir in enumerate(file_dirs):
+        sub_dir_path = base_directory / file_dir
+        number_of_files += len(os.listdir(sub_dir_path))
+    return number_of_files
+
+
+def _get_compare_pair(data_source, record_batch):
+    num_of_files_generated = _get_num_of_files_generated(
+        base_directory=data_source)
+    number_of_unique_rows = len(pa.compute.unique(record_batch[0]))
+    return num_of_files_generated, number_of_unique_rows
+
+
+def test_write_dataset_max_rows_per_file(tempdir):
+    directory = tempdir / 'ds'
+    max_rows_per_file = 10
+    max_rows_per_group = 10
+    num_of_columns = 2
+    records_per_row = 35
+
+    data, column_names = _generate_data_and_columns(num_of_columns,
+                                                    records_per_row)
+
+    record_batch = pa.record_batch(data=data, names=column_names)
+
+    sub_directory = directory / 'onewrite'
+
+    ds.write_dataset(record_batch, sub_directory, format="parquet",
+                     max_rows_per_file=max_rows_per_file,
+                     max_rows_per_group=max_rows_per_group)
+
+    files_in_dir = os.listdir(sub_directory)
+
+    # number of partitions with max_rows and the partition with the remainder
+    expected_partitions = len(data[0]) // max_rows_per_file + 1
+    expected_row_combination = [max_rows_per_file
+                                for i in range(expected_partitions - 1)] \
+        + [len(data[0]) - ((expected_partitions - 1) * max_rows_per_file)]

Review comment:
       There is some complexity here to calculate `expected_row_combination` but at the end of the day all you use is `len(expected_row_combination)` which is `expected_partitions` and `sum(expected_row_combination)` which is `records_per_row`.  Maybe you don't need `expected_row_combination`?

##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -3621,6 +3621,204 @@ def compare_tables_ignoring_order(t1, t2):
     assert not extra_file.exists()
 
 
+def _generate_random_int_array(size=4, min=1, max=10):
+    return np.random.randint(min, max, size)
+
+
+def _generate_data_and_columns(num_of_columns, records_per_row,
+                               unique_records=None):

Review comment:
       Is the number of `unique_records` relevant for these tests?

##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -3621,6 +3621,204 @@ def compare_tables_ignoring_order(t1, t2):
     assert not extra_file.exists()
 
 
+def _generate_random_int_array(size=4, min=1, max=10):
+    return np.random.randint(min, max, size)
+
+
+def _generate_data_and_columns(num_of_columns, records_per_row,
+                               unique_records=None):
+    data = []
+    column_names = []
+    if unique_records is None:
+        unique_records = records_per_row
+    for i in range(num_of_columns):
+        data.append(_generate_random_int_array(size=records_per_row,
+                                               min=1,
+                                               max=unique_records))
+        column_names.append("c" + str(i))
+    return data, column_names

Review comment:
       Perhaps return a record batch?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org