You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/09/05 02:26:13 UTC

[GitHub] [beam] peridotml opened a new pull request, #23030: Draft: WriteParquetBatched

peridotml opened a new pull request, #23030:
URL: https://github.com/apache/beam/pull/23030

   This is related to #22813. @TheNeuralBit I put up a draft. Let me know if this still would be worth adding to the sdk or if there is way to do this already.
   
   This is an example of the pipeline:
   ```python
   with TestPipeline() as p:
     _ = p \
     | RunInference(...) \
     | BatchToArrowTable() \  # turns a dict of vectors into a pa.Table,
                              # e.g. {'id': [1, 2, 3], 'score': [0.9, 0.1, 0.8]} -> pa.Table     
     | WriteToParquetBatched(
         path, self.SCHEMA, num_shards=1, shard_name_template='')
   ```
   
   ### Motivation
   * Don't need to un-batch into a series of python dicts, which hurts performance.
   * Models naturally return batches, so it works
   * I didn't add it in yet, but you can infer the schema from the first batch (possibly will break something, but worked with dataflow)
   
   ### Open Questions
   1. Controlling `row_group_size` would need to be handled upstream
   2. A lot of duplicate code. Possibly some smart ways to refactor a bit (let me know)
   
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/get-started-contributing/#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] peridotml commented on a diff in pull request #23030: Draft: WriteParquetBatched

Posted by GitBox <gi...@apache.org>.
peridotml commented on code in PR #23030:
URL: https://github.com/apache/beam/pull/23030#discussion_r967882132


##########
sdks/python/apache_beam/io/parquetio.py:
##########
@@ -83,6 +86,67 @@ def process(self, table, with_filename=False):
         yield row
 
 
+class _RowDictionariesToArrowTable(DoFn):
+  """ A DoFn that consumes python dictionarys and yields a pyarrow table."""
+  def __init__(
+      self,
+      schema,
+      row_group_buffer_size=64 * 1024 * 1024,
+      record_batch_size=1000):
+    self._schema = schema
+    self._row_group_buffer_size = row_group_buffer_size
+    self._buffer = [[] for _ in range(len(schema.names))]
+    self._buffer_size = record_batch_size
+    self._record_batches = []
+    self._record_batches_byte_size = 0
+
+  def process(self, row):
+    if len(self._buffer[0]) >= self._buffer_size:
+      self._flush_buffer()
+
+    if self._record_batches_byte_size >= self._row_group_buffer_size:

Review Comment:
   There also is `[rb|pa_table].get_total_buffer_size`, which would save some lines of code and is actually a lot faster. Although, it is pretty negligible.
   
   Test results
   * 900 rows x 10 columns 830 nano sec vs 9.94 micro secs
   * 900 rows x 100 columns   9.8 micro sec vs 100 micro secs
   
   Unfortunately, the counts don't line up perfectly and it already is pretty fast. Just wanted to flag this as an option!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on a diff in pull request #23030: Draft: WriteParquetBatched

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on code in PR #23030:
URL: https://github.com/apache/beam/pull/23030#discussion_r964915520


##########
sdks/python/apache_beam/io/parquetio.py:
##########
@@ -612,3 +613,111 @@ def _flush_buffer(self):
         if b is not None:
           size = size + b.size
     self._record_batches_byte_size = self._record_batches_byte_size + size
+
+
+class WriteToParquetBatched(PTransform):
+  """Initialize a WriteToParquetBatched transform.
+
+     Writes parquet files from a :class:`~apache_beam.pvalue.PCollection` of
+     batches. Each batch is a pa.Table. Schema must be specified like the example below.
+     """
+  def __init__(
+      self,
+      file_path_prefix,
+      schema=None,
+      codec='none',
+      use_deprecated_int96_timestamps=False,
+      use_compliant_nested_type=False,
+      file_name_suffix='',
+      num_shards=0,
+      shard_name_template=None,
+      mime_type='application/x-parquet',
+  ):
+    super().__init__()
+    self._sink = _BatchedParquetSink(
+        file_path_prefix=file_path_prefix,
+        schema=schema,
+        codec=codec,
+        use_deprecated_int96_timestamps=use_deprecated_int96_timestamps,
+        use_compliant_nested_type=use_compliant_nested_type,
+        file_name_suffix=file_name_suffix,
+        num_shards=num_shards,
+        shard_name_template=shard_name_template,
+        mime_type=mime_type,
+    )
+
+  def expand(self, pcoll):
+    return pcoll | Write(self._sink)
+
+  def display_data(self):
+    return {'sink_dd': self._sink}
+
+
+class _BatchedParquetSink(filebasedsink.FileBasedSink):
+  """A sink for parquet files from batches."""
+  def __init__(
+      self,
+      file_path_prefix,
+      schema,
+      codec,
+      use_deprecated_int96_timestamps,
+      use_compliant_nested_type,
+      file_name_suffix,
+      num_shards,
+      shard_name_template,
+      mime_type):
+    super().__init__(
+        file_path_prefix,
+        file_name_suffix=file_name_suffix,
+        num_shards=num_shards,
+        shard_name_template=shard_name_template,
+        coder=None,
+        mime_type=mime_type,
+        # Compression happens at the block level using the supplied codec, and
+        # not at the file level.
+        compression_type=CompressionTypes.UNCOMPRESSED)
+    self._schema = schema
+    self._codec = codec
+    if ARROW_MAJOR_VERSION == 1 and self._codec.lower() == "lz4":
+      raise ValueError(
+          "Due to ARROW-9424, writing with LZ4 compression is not supported in "
+          "pyarrow 1.x, please use a different pyarrow version or a different "
+          f"codec. Your pyarrow version: {pa.__version__}")
+    self._use_deprecated_int96_timestamps = use_deprecated_int96_timestamps
+    if use_compliant_nested_type and ARROW_MAJOR_VERSION < 4:
+      raise ValueError(
+          "With ARROW-11497, use_compliant_nested_type is only supported in "
+          "pyarrow version >= 4.x, please use a different pyarrow version. "
+          f"Your pyarrow version: {pa.__version__}")
+    self._use_compliant_nested_type = use_compliant_nested_type
+    self._file_handle = None
+
+  def open(self, temp_path):
+    self._file_handle = super().open(temp_path)
+    if ARROW_MAJOR_VERSION < 4:
+      return pq.ParquetWriter(
+          self._file_handle,
+          self._schema,
+          compression=self._codec,
+          use_deprecated_int96_timestamps=self._use_deprecated_int96_timestamps)
+    return pq.ParquetWriter(
+        self._file_handle,
+        self._schema,
+        compression=self._codec,
+        use_deprecated_int96_timestamps=self._use_deprecated_int96_timestamps,
+        use_compliant_nested_type=self._use_compliant_nested_type)

Review Comment:
   Could you try to re-use this for the element-wise Parquet sink? I think we could structure this similar to ReadFromParquet andReadFromParquetBatched. They both use the same underlying source that produces batches, but the former adds an additional transform to explode the batches into individual dictionaries: https://github.com/apache/beam/blob/26e5d1c254de24b217e9e049733a3da778d4aa7a/sdks/python/apache_beam/io/parquetio.py#L204-L205



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on a diff in pull request #23030: Draft: WriteParquetBatched

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on code in PR #23030:
URL: https://github.com/apache/beam/pull/23030#discussion_r989469974


##########
sdks/python/apache_beam/io/parquetio.py:
##########
@@ -565,22 +665,9 @@ def open(self, temp_path):
         use_compliant_nested_type=self._use_compliant_nested_type)
 
   def write_record(self, writer, value):
-    if len(self._buffer[0]) >= self._buffer_size:
-      self._flush_buffer()
-
-    if self._record_batches_byte_size >= self._row_group_buffer_size:
-      self._write_batches(writer)
-
-    # reorder the data in columnar format.
-    for i, n in enumerate(self._schema.names):
-      self._buffer[i].append(value[n])
+    writer.write_table(value)

Review Comment:
   nit: maybe change `value` to `table: pa.Table` to make the expectation explicit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on a diff in pull request #23030: Draft: WriteParquetBatched

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on code in PR #23030:
URL: https://github.com/apache/beam/pull/23030#discussion_r989464309


##########
sdks/python/apache_beam/io/parquetio.py:
##########
@@ -83,6 +86,67 @@ def process(self, table, with_filename=False):
         yield row
 
 
+class _RowDictionariesToArrowTable(DoFn):
+  """ A DoFn that consumes python dictionarys and yields a pyarrow table."""
+  def __init__(
+      self,
+      schema,
+      row_group_buffer_size=64 * 1024 * 1024,
+      record_batch_size=1000):
+    self._schema = schema
+    self._row_group_buffer_size = row_group_buffer_size
+    self._buffer = [[] for _ in range(len(schema.names))]
+    self._buffer_size = record_batch_size
+    self._record_batches = []
+    self._record_batches_byte_size = 0
+
+  def process(self, row):
+    if len(self._buffer[0]) >= self._buffer_size:
+      self._flush_buffer()
+
+    if self._record_batches_byte_size >= self._row_group_buffer_size:

Review Comment:
   Hm, the counts don't line up perfectly? How different are they? If they are different I would suspect the number provided by pyarrow is the correct one.
   
   I think this is worth digging into further, but I'd recommend filing an issue and doing it in a separate PR.  That way this one is mostly a no-op moving code around.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] peridotml commented on a diff in pull request #23030: Draft: WriteParquetBatched

Posted by GitBox <gi...@apache.org>.
peridotml commented on code in PR #23030:
URL: https://github.com/apache/beam/pull/23030#discussion_r967867516


##########
sdks/python/apache_beam/io/parquetio.py:
##########
@@ -83,6 +86,67 @@ def process(self, table, with_filename=False):
         yield row
 
 
+class _RowDictionariesToArrowTable(DoFn):
+  """ A DoFn that consumes python dictionarys and yields a pyarrow table."""
+  def __init__(
+      self,
+      schema,
+      row_group_buffer_size=64 * 1024 * 1024,
+      record_batch_size=1000):
+    self._schema = schema
+    self._row_group_buffer_size = row_group_buffer_size
+    self._buffer = [[] for _ in range(len(schema.names))]
+    self._buffer_size = record_batch_size
+    self._record_batches = []
+    self._record_batches_byte_size = 0
+
+  def process(self, row):
+    if len(self._buffer[0]) >= self._buffer_size:
+      self._flush_buffer()
+
+    if self._record_batches_byte_size >= self._row_group_buffer_size:
+      table = self._create_table()
+      yield table
+
+    # reorder the data in columnar format.
+    for i, n in enumerate(self._schema.names):
+      self._buffer[i].append(row[n])
+
+  def finish_bundle(self):
+    if len(self._buffer[0]) > 0:
+      self._flush_buffer()
+    if self._record_batches_byte_size > 0:
+      table = self._create_table()
+      yield window.GlobalWindows.windowed_value_at_end_of_window(table)
+
+  def display_data(self):
+    res = super().display_data()
+    res['row_group_buffer_size'] = str(self._row_group_buffer_size)
+    res['buffer_size'] = str(self._buffer_size)
+
+    return res
+
+  def _create_table(self):
+    table = pa.Table.from_batches(self._record_batches, schema=self._schema)
+    self._record_batches = []
+    self._record_batches_byte_size = 0
+    return table
+
+  def _flush_buffer(self):
+    arrays = [[] for _ in range(len(self._schema.names))]
+    for x, y in enumerate(self._buffer):
+      arrays[x] = pa.array(y, type=self._schema.types[x])
+      self._buffer[x] = []
+    rb = pa.RecordBatch.from_arrays(arrays, schema=self._schema)
+    self._record_batches.append(rb)
+    size = 0
+    for x in arrays:

Review Comment:
   Both `pa.RecordBatch` and `pa.Table` have the attribute `nbytes`. It could be simpler and more performant to just use the attribute. I couldn't get documentation to see if it was available before `v1` of pyarrow.  



##########
sdks/python/apache_beam/io/parquetio.py:
##########
@@ -83,6 +86,67 @@ def process(self, table, with_filename=False):
         yield row
 
 
+class _RowDictionariesToArrowTable(DoFn):
+  """ A DoFn that consumes python dictionarys and yields a pyarrow table."""
+  def __init__(
+      self,
+      schema,
+      row_group_buffer_size=64 * 1024 * 1024,
+      record_batch_size=1000):
+    self._schema = schema
+    self._row_group_buffer_size = row_group_buffer_size
+    self._buffer = [[] for _ in range(len(schema.names))]
+    self._buffer_size = record_batch_size
+    self._record_batches = []
+    self._record_batches_byte_size = 0
+
+  def process(self, row):
+    if len(self._buffer[0]) >= self._buffer_size:
+      self._flush_buffer()
+
+    if self._record_batches_byte_size >= self._row_group_buffer_size:
+      table = self._create_table()
+      yield table
+
+    # reorder the data in columnar format.
+    for i, n in enumerate(self._schema.names):
+      self._buffer[i].append(row[n])
+
+  def finish_bundle(self):
+    if len(self._buffer[0]) > 0:
+      self._flush_buffer()
+    if self._record_batches_byte_size > 0:
+      table = self._create_table()
+      yield window.GlobalWindows.windowed_value_at_end_of_window(table)

Review Comment:
   It seems like we need to be careful with windows when using buffers inside of DoFns. 
   
   I don't actually know if this works, but I looked at `BatchElements` and copied part of it. If there is a working strategy, then I assumed you would let me know. I am not experienced with streaming.



##########
sdks/python/apache_beam/io/parquetio.py:
##########
@@ -83,6 +86,67 @@ def process(self, table, with_filename=False):
         yield row
 
 
+class _RowDictionariesToArrowTable(DoFn):
+  """ A DoFn that consumes python dictionarys and yields a pyarrow table."""
+  def __init__(
+      self,
+      schema,
+      row_group_buffer_size=64 * 1024 * 1024,
+      record_batch_size=1000):
+    self._schema = schema
+    self._row_group_buffer_size = row_group_buffer_size
+    self._buffer = [[] for _ in range(len(schema.names))]
+    self._buffer_size = record_batch_size
+    self._record_batches = []
+    self._record_batches_byte_size = 0
+
+  def process(self, row):
+    if len(self._buffer[0]) >= self._buffer_size:
+      self._flush_buffer()
+
+    if self._record_batches_byte_size >= self._row_group_buffer_size:

Review Comment:
   If we switched to using `nbytes` then this could be moved into the sink if that makes more sense.



##########
sdks/python/apache_beam/io/parquetio.py:
##########
@@ -83,6 +86,67 @@ def process(self, table, with_filename=False):
         yield row
 
 
+class _RowDictionariesToArrowTable(DoFn):

Review Comment:
   I used `pa.Table` for consistency with reading batched parquets, but I wanted to bring up that `pa.RecordBatch` would also work.
   
   I don't have strong preferences either way. I would use it both ways. 



##########
sdks/python/apache_beam/io/parquetio.py:
##########
@@ -565,22 +665,9 @@ def open(self, temp_path):
         use_compliant_nested_type=self._use_compliant_nested_type)
 
   def write_record(self, writer, value):
-    if len(self._buffer[0]) >= self._buffer_size:
-      self._flush_buffer()
-
-    if self._record_batches_byte_size >= self._row_group_buffer_size:
-      self._write_batches(writer)
-
-    # reorder the data in columnar format.
-    for i, n in enumerate(self._schema.names):
-      self._buffer[i].append(value[n])
+    writer.write_table(value)

Review Comment:
   I have used this before where I infer the schema from the first batch that comes through. This simplified my model inference scripts and avoided loading tensorflow saved models to get output signature and converting them to pyarrow types.
   
   It worked locally and on Dataflow. Thought I would throw it out there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on a diff in pull request #23030: Draft: WriteParquetBatched

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on code in PR #23030:
URL: https://github.com/apache/beam/pull/23030#discussion_r989473233


##########
sdks/python/apache_beam/io/parquetio.py:
##########
@@ -83,6 +86,67 @@ def process(self, table, with_filename=False):
         yield row
 
 
+class _RowDictionariesToArrowTable(DoFn):
+  """ A DoFn that consumes python dictionarys and yields a pyarrow table."""
+  def __init__(
+      self,
+      schema,
+      row_group_buffer_size=64 * 1024 * 1024,
+      record_batch_size=1000):
+    self._schema = schema
+    self._row_group_buffer_size = row_group_buffer_size
+    self._buffer = [[] for _ in range(len(schema.names))]
+    self._buffer_size = record_batch_size
+    self._record_batches = []
+    self._record_batches_byte_size = 0
+
+  def process(self, row):
+    if len(self._buffer[0]) >= self._buffer_size:
+      self._flush_buffer()
+
+    if self._record_batches_byte_size >= self._row_group_buffer_size:
+      table = self._create_table()
+      yield table
+
+    # reorder the data in columnar format.
+    for i, n in enumerate(self._schema.names):
+      self._buffer[i].append(row[n])
+
+  def finish_bundle(self):
+    if len(self._buffer[0]) > 0:
+      self._flush_buffer()
+    if self._record_batches_byte_size > 0:
+      table = self._create_table()
+      yield window.GlobalWindows.windowed_value_at_end_of_window(table)

Review Comment:
   I think this is ok for now, at least it shouldn't be a regression from the current behavior.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on a diff in pull request #23030: Draft: WriteParquetBatched

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on code in PR #23030:
URL: https://github.com/apache/beam/pull/23030#discussion_r989461981


##########
sdks/python/apache_beam/io/parquetio.py:
##########
@@ -83,6 +86,67 @@ def process(self, table, with_filename=False):
         yield row
 
 
+class _RowDictionariesToArrowTable(DoFn):
+  """ A DoFn that consumes python dictionarys and yields a pyarrow table."""
+  def __init__(
+      self,
+      schema,
+      row_group_buffer_size=64 * 1024 * 1024,
+      record_batch_size=1000):
+    self._schema = schema
+    self._row_group_buffer_size = row_group_buffer_size
+    self._buffer = [[] for _ in range(len(schema.names))]
+    self._buffer_size = record_batch_size
+    self._record_batches = []
+    self._record_batches_byte_size = 0
+
+  def process(self, row):
+    if len(self._buffer[0]) >= self._buffer_size:
+      self._flush_buffer()
+
+    if self._record_batches_byte_size >= self._row_group_buffer_size:
+      table = self._create_table()
+      yield table
+
+    # reorder the data in columnar format.
+    for i, n in enumerate(self._schema.names):
+      self._buffer[i].append(row[n])
+
+  def finish_bundle(self):
+    if len(self._buffer[0]) > 0:
+      self._flush_buffer()
+    if self._record_batches_byte_size > 0:
+      table = self._create_table()
+      yield window.GlobalWindows.windowed_value_at_end_of_window(table)
+
+  def display_data(self):
+    res = super().display_data()
+    res['row_group_buffer_size'] = str(self._row_group_buffer_size)
+    res['buffer_size'] = str(self._buffer_size)
+
+    return res
+
+  def _create_table(self):
+    table = pa.Table.from_batches(self._record_batches, schema=self._schema)
+    self._record_batches = []
+    self._record_batches_byte_size = 0
+    return table
+
+  def _flush_buffer(self):
+    arrays = [[] for _ in range(len(self._schema.names))]
+    for x, y in enumerate(self._buffer):
+      arrays[x] = pa.array(y, type=self._schema.types[x])
+      self._buffer[x] = []
+    rb = pa.RecordBatch.from_arrays(arrays, schema=self._schema)
+    self._record_batches.append(rb)
+    size = 0
+    for x in arrays:

Review Comment:
   Note we test with all supported major versions of pyarrow in CI, so you don't need to worry about unintentionally breaking compatibility with an old version.
   
   Also, if support for pyarrow 0.x or 1.x is problematic we could go ahead and drop them. They are 2 years old at this point.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #23030: Draft: WriteParquetBatched

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #23030:
URL: https://github.com/apache/beam/pull/23030#issuecomment-1295710216

   Assigning reviewers. If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @AnandInguva for label python.
   R: @ahmedabu98 for label io.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)
   
   The PR bot will only process comments in the main thread (not review comments).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] peridotml commented on a diff in pull request #23030: Draft: WriteParquetBatched

Posted by GitBox <gi...@apache.org>.
peridotml commented on code in PR #23030:
URL: https://github.com/apache/beam/pull/23030#discussion_r967534919


##########
sdks/python/apache_beam/io/parquetio.py:
##########
@@ -612,3 +613,111 @@ def _flush_buffer(self):
         if b is not None:
           size = size + b.size
     self._record_batches_byte_size = self._record_batches_byte_size + size
+
+
+class WriteToParquetBatched(PTransform):
+  """Initialize a WriteToParquetBatched transform.
+
+     Writes parquet files from a :class:`~apache_beam.pvalue.PCollection` of
+     batches. Each batch is a pa.Table. Schema must be specified like the example below.
+     """
+  def __init__(
+      self,
+      file_path_prefix,
+      schema=None,
+      codec='none',
+      use_deprecated_int96_timestamps=False,
+      use_compliant_nested_type=False,
+      file_name_suffix='',
+      num_shards=0,
+      shard_name_template=None,
+      mime_type='application/x-parquet',
+  ):
+    super().__init__()
+    self._sink = _BatchedParquetSink(
+        file_path_prefix=file_path_prefix,
+        schema=schema,
+        codec=codec,
+        use_deprecated_int96_timestamps=use_deprecated_int96_timestamps,
+        use_compliant_nested_type=use_compliant_nested_type,
+        file_name_suffix=file_name_suffix,
+        num_shards=num_shards,
+        shard_name_template=shard_name_template,
+        mime_type=mime_type,
+    )
+
+  def expand(self, pcoll):
+    return pcoll | Write(self._sink)
+
+  def display_data(self):
+    return {'sink_dd': self._sink}
+
+
+class _BatchedParquetSink(filebasedsink.FileBasedSink):
+  """A sink for parquet files from batches."""
+  def __init__(
+      self,
+      file_path_prefix,
+      schema,
+      codec,
+      use_deprecated_int96_timestamps,
+      use_compliant_nested_type,
+      file_name_suffix,
+      num_shards,
+      shard_name_template,
+      mime_type):
+    super().__init__(
+        file_path_prefix,
+        file_name_suffix=file_name_suffix,
+        num_shards=num_shards,
+        shard_name_template=shard_name_template,
+        coder=None,
+        mime_type=mime_type,
+        # Compression happens at the block level using the supplied codec, and
+        # not at the file level.
+        compression_type=CompressionTypes.UNCOMPRESSED)
+    self._schema = schema
+    self._codec = codec
+    if ARROW_MAJOR_VERSION == 1 and self._codec.lower() == "lz4":
+      raise ValueError(
+          "Due to ARROW-9424, writing with LZ4 compression is not supported in "
+          "pyarrow 1.x, please use a different pyarrow version or a different "
+          f"codec. Your pyarrow version: {pa.__version__}")
+    self._use_deprecated_int96_timestamps = use_deprecated_int96_timestamps
+    if use_compliant_nested_type and ARROW_MAJOR_VERSION < 4:
+      raise ValueError(
+          "With ARROW-11497, use_compliant_nested_type is only supported in "
+          "pyarrow version >= 4.x, please use a different pyarrow version. "
+          f"Your pyarrow version: {pa.__version__}")
+    self._use_compliant_nested_type = use_compliant_nested_type
+    self._file_handle = None
+
+  def open(self, temp_path):
+    self._file_handle = super().open(temp_path)
+    if ARROW_MAJOR_VERSION < 4:
+      return pq.ParquetWriter(
+          self._file_handle,
+          self._schema,
+          compression=self._codec,
+          use_deprecated_int96_timestamps=self._use_deprecated_int96_timestamps)
+    return pq.ParquetWriter(
+        self._file_handle,
+        self._schema,
+        compression=self._codec,
+        use_deprecated_int96_timestamps=self._use_deprecated_int96_timestamps,
+        use_compliant_nested_type=self._use_compliant_nested_type)

Review Comment:
   @TheNeuralBit Brilliant idea! I will implement it this weekend. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] codecov[bot] commented on pull request #23030: Draft: WriteParquetBatched

Posted by GitBox <gi...@apache.org>.
codecov[bot] commented on PR #23030:
URL: https://github.com/apache/beam/pull/23030#issuecomment-1236493263

   # [Codecov](https://codecov.io/gh/apache/beam/pull/23030?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#23030](https://codecov.io/gh/apache/beam/pull/23030?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (e0acb72) into [master](https://codecov.io/gh/apache/beam/commit/31561e2ff13147aa80f9f811e2a94ebe57b25374?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (31561e2) will **decrease** coverage by `0.03%`.
   > The diff coverage is `91.89%`.
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #23030      +/-   ##
   ==========================================
   - Coverage   73.71%   73.67%   -0.04%     
   ==========================================
     Files         714      714              
     Lines       95240    95257      +17     
   ==========================================
   - Hits        70203    70178      -25     
   - Misses      23740    23782      +42     
     Partials     1297     1297              
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | python | `83.47% <91.89%> (-0.06%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/23030?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [sdks/python/apache\_beam/io/parquetio.py](https://codecov.io/gh/apache/beam/pull/23030/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vcGFycXVldGlvLnB5) | `94.61% <91.89%> (-0.55%)` | :arrow_down: |
   | [...ks/python/apache\_beam/runners/worker/statecache.py](https://codecov.io/gh/apache/beam/pull/23030/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvc3RhdGVjYWNoZS5weQ==) | `89.69% <0.00%> (-6.47%)` | :arrow_down: |
   | [...dks/python/apache\_beam/metrics/monitoring\_infos.py](https://codecov.io/gh/apache/beam/pull/23030/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vbWV0cmljcy9tb25pdG9yaW5nX2luZm9zLnB5) | `92.50% <0.00%> (-4.50%)` | :arrow_down: |
   | [...python/apache\_beam/runners/worker/worker\_status.py](https://codecov.io/gh/apache/beam/pull/23030/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvd29ya2VyX3N0YXR1cy5weQ==) | `76.66% <0.00%> (-3.05%)` | :arrow_down: |
   | [sdks/python/apache\_beam/utils/interactive\_utils.py](https://codecov.io/gh/apache/beam/pull/23030/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdXRpbHMvaW50ZXJhY3RpdmVfdXRpbHMucHk=) | `95.12% <0.00%> (-2.44%)` | :arrow_down: |
   | [...ks/python/apache\_beam/runners/worker/data\_plane.py](https://codecov.io/gh/apache/beam/pull/23030/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvZGF0YV9wbGFuZS5weQ==) | `87.57% <0.00%> (-1.70%)` | :arrow_down: |
   | [...hon/apache\_beam/runners/direct/test\_stream\_impl.py](https://codecov.io/gh/apache/beam/pull/23030/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvdGVzdF9zdHJlYW1faW1wbC5weQ==) | `93.28% <0.00%> (-0.75%)` | :arrow_down: |
   | [...eam/runners/portability/fn\_api\_runner/execution.py](https://codecov.io/gh/apache/beam/pull/23030/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL2V4ZWN1dGlvbi5weQ==) | `92.44% <0.00%> (-0.65%)` | :arrow_down: |
   | [sdks/python/apache\_beam/runners/direct/executor.py](https://codecov.io/gh/apache/beam/pull/23030/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvZXhlY3V0b3IucHk=) | `96.46% <0.00%> (-0.55%)` | :arrow_down: |
   | [...eam/runners/interactive/interactive\_environment.py](https://codecov.io/gh/apache/beam/pull/23030/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9pbnRlcmFjdGl2ZV9lbnZpcm9ubWVudC5weQ==) | `91.71% <0.00%> (-0.31%)` | :arrow_down: |
   | ... and [5 more](https://codecov.io/gh/apache/beam/pull/23030/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] peridotml commented on pull request #23030: Draft: WriteParquetBatched

Posted by GitBox <gi...@apache.org>.
peridotml commented on PR #23030:
URL: https://github.com/apache/beam/pull/23030#issuecomment-1236485939

   @TheNeuralBit here is an initial draft. Let me know what you think and I will work on it more! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] esadler-hbo commented on pull request #23030: Add WriteParquetBatched

Posted by GitBox <gi...@apache.org>.
esadler-hbo commented on PR #23030:
URL: https://github.com/apache/beam/pull/23030#issuecomment-1297712591

   @TheNeuralBit ...I had some time to spend on this. Responded to your code review and added better doc strings.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on pull request #23030: Add WriteParquetBatched

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on PR #23030:
URL: https://github.com/apache/beam/pull/23030#issuecomment-1298875109

   Thank you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit merged pull request #23030: Add WriteParquetBatched

Posted by GitBox <gi...@apache.org>.
TheNeuralBit merged PR #23030:
URL: https://github.com/apache/beam/pull/23030


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org