You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/05/01 06:31:07 UTC

[GitHub] [arrow] niyue opened a new pull request, #13041: ARROW-16430, support reading record batch custom metadata API in pyarrow

niyue opened a new pull request, #13041:
URL: https://github.com/apache/arrow/pull/13041

   In ARROW-16131, C++ APIs were added so that users can read/write record batch custom metadata for IPC file. In this PR, pyarrow APIs are added so that python users can take advantage of these APIs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] niyue commented on a diff in pull request #13041: ARROW-16430, support reading record batch custom metadata API in pyarrow

Posted by GitBox <gi...@apache.org>.
niyue commented on code in PR #13041:
URL: https://github.com/apache/arrow/pull/13041#discussion_r862430522


##########
cpp/src/arrow/ipc/writer.h:
##########
@@ -104,8 +104,7 @@ class ARROW_EXPORT RecordBatchWriter {
   virtual Status WriteRecordBatch(
       const RecordBatch& batch,
       const std::shared_ptr<const KeyValueMetadata>& custom_metadata) {
-    return Status::NotImplemented(
-        "Write record batch with custom metadata not implemented");
+    return WriteRecordBatch(batch);

Review Comment:
   In https://github.com/apache/arrow/pull/12812, @pitrou suggested to ignore the custom_metadata by default. By I somehow missed that part and the default implementation is throwing an error. When I was adding this API in pyarrow, I found one test case failed (https://github.com/apache/arrow/blob/master/python/pyarrow/tests/test_csv.py#L1895). And in this csv test case, it uses the `CsvWriter` which doesn't implement this API signature.
   
   Since storing custom metadata is an IPC feature, probably other serialization won't support this so far, I realize probably we should switch the default implementation to discarding the provided custom_metadata.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] niyue commented on pull request #13041: ARROW-16430: [Python] Add support for reading record batch custom metadata API

Posted by GitBox <gi...@apache.org>.
niyue commented on PR #13041:
URL: https://github.com/apache/arrow/pull/13041#issuecomment-1124612500

   > By the way, this PR addresses `RecordBatchFileReader` but not `RecordBatchReader`. Do you plan to do that as well?
   
   I tried adding the pyarrow API for `RecordBatchReader` locally previously, but I found if I want to test the new pyarrow API for `RecordBatchReader`, I probably have to add the corresponding API implementation in C++ `RecordBatchStreamReader` so that I can unit test it in a meaningful way since it seems only `RecordBatchStreamReader` and `RecordBatchFileReader` provides the capability to read record batch custom metadata (is this correct?). And since this PR is more about adding new pyarrow API for existing C++ capability, I would like to make non necessary C++ API change less in this PR.
   
   1) if there is other way to unit test pyarrow `RecordBatchReader.read_next_batch_with_metadata` API besides implementing it in `RecordBatchStreamReader`?
   2) do you recommend to put all these changes together in single PR or keep them as separated PRs? I initially submitted this issue as a pyarrow enhancement PR, but I am okay to include more API changes if it is preferred.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] niyue commented on pull request #13041: ARROW-16430: [Python] Add support for reading record batch custom metadata API

Posted by GitBox <gi...@apache.org>.
niyue commented on PR #13041:
URL: https://github.com/apache/arrow/pull/13041#issuecomment-1241313990

   @pitrou I am not sure if you see my previous comment. I made some change previously, and this PR is ready for review, could you please help? Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] niyue commented on a diff in pull request #13041: ARROW-16430: [Python] Add support for reading record batch custom metadata API

Posted by GitBox <gi...@apache.org>.
niyue commented on code in PR #13041:
URL: https://github.com/apache/arrow/pull/13041#discussion_r870968165


##########
python/pyarrow/ipc.pxi:
##########
@@ -471,17 +471,22 @@ cdef class _CRecordBatchWriter(_Weakrefable):
         else:
             raise ValueError(type(table_or_batch))
 
-    def write_batch(self, RecordBatch batch):
+    def write_batch(self, RecordBatch batch, custom_metadata=None):
         """
         Write RecordBatch to stream.
 
         Parameters
         ----------
         batch : RecordBatch
+        custom_metadata : dict

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on pull request #13041: ARROW-16430: [Python] Add support for reading record batch custom metadata API

Posted by GitBox <gi...@apache.org>.
pitrou commented on PR #13041:
URL: https://github.com/apache/arrow/pull/13041#issuecomment-1122225926

   By the way, this PR addresses `RecordBatchFileReader` but not `RecordBatchReader`. Do you plan to do that as well?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] niyue commented on a diff in pull request #13041: ARROW-16430: [Python] Add support for reading record batch custom metadata API

Posted by GitBox <gi...@apache.org>.
niyue commented on code in PR #13041:
URL: https://github.com/apache/arrow/pull/13041#discussion_r867284969


##########
python/pyarrow/includes/libarrow.pxd:
##########
@@ -794,6 +794,10 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
         shared_ptr[CRecordBatch] Slice(int64_t offset)
         shared_ptr[CRecordBatch] Slice(int64_t offset, int64_t length)
 
+    cdef cppclass CRecordBatchWithMetadata" arrow::RecordBatchWithMetadata":
+        shared_ptr[CRecordBatch] batch
+        const shared_ptr[const CKeyValueMetadata] custom_metadata

Review Comment:
   Thanks. Comment is added to explain this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] niyue commented on a diff in pull request #13041: ARROW-16430: [Python] Add support for reading record batch custom metadata API

Posted by GitBox <gi...@apache.org>.
niyue commented on code in PR #13041:
URL: https://github.com/apache/arrow/pull/13041#discussion_r980803518


##########
python/pyarrow/_flight.pyx:
##########
@@ -986,14 +986,18 @@ cdef class MetadataRecordBatchWriter(_CRecordBatchWriter):
         ----------
         batch : RecordBatch
         """
+        cdef:
+            shared_ptr[const CKeyValueMetadata] custom_metadata

Review Comment:
   I am not very familiar with arrow flight API, previously I want to enhance `write_batch` here by allowing `custom_metadata` as a param, however, I find there is already an API called `write_with_metadata` (in `_flight.pyx` as well), so I am not sure if it is the right thing to do. So far this `custom_metadata` in this API is always null. Do you have any suggestion here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] niyue commented on a diff in pull request #13041: ARROW-16430: [Python] Add support for reading record batch custom metadata API

Posted by GitBox <gi...@apache.org>.
niyue commented on code in PR #13041:
URL: https://github.com/apache/arrow/pull/13041#discussion_r980795270


##########
python/pyarrow/ipc.pxi:
##########
@@ -908,6 +958,32 @@ cdef class _RecordBatchFileReader(_Weakrefable):
     # time has passed
     get_record_batch = get_batch
 
+    def get_batch_with_custom_metadata(self, int i):
+        """
+        Read the record batch with the given index along with its custom metadata

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #13041: ARROW-16430: [Python] Add support for reading record batch custom metadata API

Posted by GitBox <gi...@apache.org>.
pitrou commented on code in PR #13041:
URL: https://github.com/apache/arrow/pull/13041#discussion_r980959259


##########
python/pyarrow/_flight.pyx:
##########
@@ -986,14 +986,18 @@ cdef class MetadataRecordBatchWriter(_CRecordBatchWriter):
         ----------
         batch : RecordBatch
         """
+        cdef:
+            shared_ptr[const CKeyValueMetadata] custom_metadata

Review Comment:
   @lidavidm What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] niyue commented on pull request #13041: ARROW-16430: [Python] Add support for reading record batch custom metadata API

Posted by GitBox <gi...@apache.org>.
niyue commented on PR #13041:
URL: https://github.com/apache/arrow/pull/13041#issuecomment-1209265836

   > @niyue Are you planning to work on this?
   > 
   > Otherwise @milesgranger you might be interested in taking this up?
   
   Hi @pitrou, sorry I got too busy since last commit and forgot this issue previously. I am still interested working on this, and can start working on it from next week. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #13041: ARROW-16430: [Python] Add support for reading record batch custom metadata API

Posted by GitBox <gi...@apache.org>.
pitrou commented on code in PR #13041:
URL: https://github.com/apache/arrow/pull/13041#discussion_r869087167


##########
python/pyarrow/tests/test_flight.py:
##########
@@ -2027,6 +2027,27 @@ def test_large_descriptor():
             client.do_exchange(large_descriptor)
 
 
+def test_write_batch_custom_metadata():
+    data = pa.Table.from_arrays([
+        pa.array(range(0, 10 * 1024))
+    ], names=["a"])
+    batches = data.to_batches()
+
+    with ExchangeFlightServer() as server, \
+            FlightClient(("localhost", server.port)) as client:
+        descriptor = flight.FlightDescriptor.for_command(b"put")
+        writer, reader = client.do_exchange(descriptor)
+        with writer:
+            writer.begin(data.schema)
+            for i, batch in enumerate(batches):
+                writer.write_batch(batch, {"batch_id": str(i)})
+            writer.done_writing()
+            chunk = reader.read_chunk()
+            assert chunk.data is None
+            expected_buf = str(len(batches)).encode("utf-8")
+            assert chunk.app_metadata == expected_buf

Review Comment:
   It's not clear to me this test is actually ensuring the metadata was sent and received, am I missing something?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] github-actions[bot] commented on pull request #13041: ARROW-16430, support reading record batch custom metadata API in pyarrow

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #13041:
URL: https://github.com/apache/arrow/pull/13041#issuecomment-1114147497

   :warning: Ticket **has not been started in JIRA**, please click 'Start Progress'.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] niyue commented on a diff in pull request #13041: ARROW-16430: [Python] Add support for reading record batch custom metadata API

Posted by GitBox <gi...@apache.org>.
niyue commented on code in PR #13041:
URL: https://github.com/apache/arrow/pull/13041#discussion_r867283915


##########
cpp/src/arrow/ipc/writer.h:
##########
@@ -104,8 +104,7 @@ class ARROW_EXPORT RecordBatchWriter {
   virtual Status WriteRecordBatch(
       const RecordBatch& batch,
       const std::shared_ptr<const KeyValueMetadata>& custom_metadata) {
-    return Status::NotImplemented(
-        "Write record batch with custom metadata not implemented");
+    return WriteRecordBatch(batch);

Review Comment:
   It won't cause crash but this default implementation (ignoring custom_metadata if not supported) seems making higher level binding easier to write. Otherwise, for example, pyarrow's implementation will need to choose which API to use like this:
   ```
   def write_batch(self, batch, custom_metadata=None):
      if (custom_metadata):
        WriteRecordBatch(batch, custom_metadata);
      else:
        WriteRecordBatch(batch);
   ```
   It seems this will be a pattern for all higher level bindings, which I think probably indicates this could be handled in lower level API, so that higher level binding could always be implemented like this:
   ```
    def write_batch(self, batch, custom_metadata=None):
        WriteRecordBatch(batch, custom_metadata);
   ```
   What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] niyue commented on pull request #13041: ARROW-16430: [Python] Add support for reading record batch custom metadata API

Posted by GitBox <gi...@apache.org>.
niyue commented on PR #13041:
URL: https://github.com/apache/arrow/pull/13041#issuecomment-1217814813

   @pitrou I pushed a new commit a minute ago. I followed your suggestion to add corresponding API for `RecordBatchReader` API in pyarrow, and a basic unit test to cover this API. I ran all pyarrow tests (using the minimum build only) and all tests passed locally. Could you please help to review? Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] niyue commented on a diff in pull request #13041: ARROW-16430: [Python] Add support for reading record batch custom metadata API

Posted by GitBox <gi...@apache.org>.
niyue commented on code in PR #13041:
URL: https://github.com/apache/arrow/pull/13041#discussion_r990940556


##########
python/pyarrow/ipc.pxi:
##########
@@ -687,6 +692,49 @@ cdef class RecordBatchReader(_Weakrefable):
 
         return pyarrow_wrap_batch(batch)
 
+    def read_next_batch_with_custom_metadata(self):
+        """
+        Read next RecordBatch from the stream along with its custom metadata.
+
+        Raises
+        ------
+        StopIteration:
+            At end of stream.
+
+        Returns
+        -------
+        batch : RecordBatch
+        custom_metadata : KeyValueMetadata
+        """
+        cdef:
+            CRecordBatchWithMetadata batch_with_metadata
+
+        with nogil:
+            batch_with_metadata = GetResultValue(self.reader.get().ReadNext())
+
+        if batch_with_metadata.batch.get() == NULL:
+            raise StopIteration
+
+        return _wrap_record_batch_with_metadata(batch_with_metadata)
+
+    def iter_batches_with_custom_metadata(self):
+        """
+        Read next RecordBatch from the stream along with its custom metadata as a generator
+        Raises

Review Comment:
   Fixed. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] niyue commented on a diff in pull request #13041: ARROW-16430: [Python] Add support for reading record batch custom metadata API

Posted by GitBox <gi...@apache.org>.
niyue commented on code in PR #13041:
URL: https://github.com/apache/arrow/pull/13041#discussion_r990937659


##########
python/pyarrow/ipc.pxi:
##########
@@ -832,6 +880,27 @@ cdef class _RecordBatchFileWriter(_RecordBatchStreamWriter):
             self.writer = GetResultValue(
                 MakeFileWriter(c_sink, schema.sp_schema, self.options))
 
+_RecordBatchWithMetadata = namedtuple(
+    'RecordBatchWithMetadata',
+    ('batch', 'custom_metadata'))
+
+
+class RecordBatchWithMetadata(_RecordBatchWithMetadata):
+    """RecordBatch with its custom metadata
+
+    Parameters
+    ----------
+    batch: RecordBatch
+    custom_metadata: KeyValueMetadata

Review Comment:
   Fixed. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #13041: ARROW-16430: [Python] Add support for reading record batch custom metadata API

Posted by GitBox <gi...@apache.org>.
pitrou commented on code in PR #13041:
URL: https://github.com/apache/arrow/pull/13041#discussion_r869085032


##########
python/pyarrow/ipc.pxi:
##########
@@ -471,17 +471,22 @@ cdef class _CRecordBatchWriter(_Weakrefable):
         else:
             raise ValueError(type(table_or_batch))
 
-    def write_batch(self, RecordBatch batch):
+    def write_batch(self, RecordBatch batch, custom_metadata=None):
         """
         Write RecordBatch to stream.
 
         Parameters
         ----------
         batch : RecordBatch
+        custom_metadata : dict

Review Comment:
   Should this be perhaps
   ```suggestion
           custom_metadata : KeyValueMetadata or dict
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] niyue commented on a diff in pull request #13041: ARROW-16430: [Python] Add support for reading record batch custom metadata API

Posted by GitBox <gi...@apache.org>.
niyue commented on code in PR #13041:
URL: https://github.com/apache/arrow/pull/13041#discussion_r980794742


##########
python/pyarrow/tests/test_ipc.py:
##########
@@ -945,6 +945,62 @@ def test_ipc_zero_copy_numpy():
     assert_frame_equal(df, rdf)
 
 
+@pytest.mark.pandas
+def test_ipc_batch_with_custom_metadata_roundtrip():
+    df = pd.DataFrame({'foo': [1.5]})
+
+    batch = pa.RecordBatch.from_pandas(df)
+    sink = pa.BufferOutputStream()
+
+    batch_count = 2
+    with pa.ipc.new_file(sink, batch.schema) as writer:
+        for i in range(batch_count):
+            writer.write_batch(batch, {"batch_id": str(i)})
+
+    buffer = sink.getvalue()
+    source = pa.BufferReader(buffer)
+
+    with pa.ipc.open_file(source) as reader:
+        batch_with_metas = [reader.get_batch_with_custom_metadata(
+            i) for i in range(reader.num_record_batches)]
+
+    for i in range(batch_count):
+        assert batch_with_metas[i].batch.num_rows == 1
+        assert batch_with_metas[i].custom_metadata == {"batch_id": str(i)}
+
+
+def batch_with_meta_generator(reader):

Review Comment:
   Sure. I add a new API `iter_batches_with_custom_metadata` in cython 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] niyue commented on a diff in pull request #13041: ARROW-16430: [Python] Add support for reading record batch custom metadata API

Posted by GitBox <gi...@apache.org>.
niyue commented on code in PR #13041:
URL: https://github.com/apache/arrow/pull/13041#discussion_r980794376


##########
python/pyarrow/tests/test_ipc.py:
##########
@@ -945,6 +945,62 @@ def test_ipc_zero_copy_numpy():
     assert_frame_equal(df, rdf)
 
 
+@pytest.mark.pandas
+def test_ipc_batch_with_custom_metadata_roundtrip():
+    df = pd.DataFrame({'foo': [1.5]})
+
+    batch = pa.RecordBatch.from_pandas(df)
+    sink = pa.BufferOutputStream()
+
+    batch_count = 2
+    with pa.ipc.new_file(sink, batch.schema) as writer:
+        for i in range(batch_count):
+            writer.write_batch(batch, {"batch_id": str(i)})

Review Comment:
   Add one more batch without metadata in the test case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #13041: ARROW-16430: [Python] Add support for reading record batch custom metadata API

Posted by GitBox <gi...@apache.org>.
pitrou commented on code in PR #13041:
URL: https://github.com/apache/arrow/pull/13041#discussion_r869082265


##########
cpp/src/arrow/ipc/writer.h:
##########
@@ -104,8 +104,7 @@ class ARROW_EXPORT RecordBatchWriter {
   virtual Status WriteRecordBatch(
       const RecordBatch& batch,
       const std::shared_ptr<const KeyValueMetadata>& custom_metadata) {
-    return Status::NotImplemented(
-        "Write record batch with custom metadata not implemented");
+    return WriteRecordBatch(batch);

Review Comment:
   Well, the default implementation can then be:
   ```c++
     virtual Status WriteRecordBatch(
         const RecordBatch& batch,
         const std::shared_ptr<const KeyValueMetadata>& custom_metadata) {
       if (custom_metadata == nullptr) {
         return WriteRecordBatch(batch);
       }
       return Status::NotImplemented(
           "Write record batch with custom metadata not implemented");
     }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #13041: ARROW-16430, support reading record batch custom metadata API in pyarrow

Posted by GitBox <gi...@apache.org>.
pitrou commented on code in PR #13041:
URL: https://github.com/apache/arrow/pull/13041#discussion_r864589296


##########
cpp/src/arrow/ipc/writer.h:
##########
@@ -104,8 +104,7 @@ class ARROW_EXPORT RecordBatchWriter {
   virtual Status WriteRecordBatch(
       const RecordBatch& batch,
       const std::shared_ptr<const KeyValueMetadata>& custom_metadata) {
-    return Status::NotImplemented(
-        "Write record batch with custom metadata not implemented");
+    return WriteRecordBatch(batch);

Review Comment:
   I think it's ok to return `Status::NotImplemented`, but of course as long as there are no regressions in other components :-)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] niyue commented on a diff in pull request #13041: ARROW-16430, support reading record batch custom metadata API in pyarrow

Posted by GitBox <gi...@apache.org>.
niyue commented on code in PR #13041:
URL: https://github.com/apache/arrow/pull/13041#discussion_r862430522


##########
cpp/src/arrow/ipc/writer.h:
##########
@@ -104,8 +104,7 @@ class ARROW_EXPORT RecordBatchWriter {
   virtual Status WriteRecordBatch(
       const RecordBatch& batch,
       const std::shared_ptr<const KeyValueMetadata>& custom_metadata) {
-    return Status::NotImplemented(
-        "Write record batch with custom metadata not implemented");
+    return WriteRecordBatch(batch);

Review Comment:
   In https://github.com/apache/arrow/pull/12812, @pitrou [suggested](https://github.com/apache/arrow/pull/12812#issuecomment-1097832538) to ignore the custom_metadata by default. By I somehow missed that part and the default implementation is throwing an error. When I was adding this API in pyarrow, I found one test case failed (https://github.com/apache/arrow/blob/master/python/pyarrow/tests/test_csv.py#L1895). And in this csv test case, it uses the `CsvWriter` which doesn't implement this API signature.
   
   Since storing custom metadata is an IPC feature, probably other serialization won't support this so far, I realize probably we should switch the default implementation to discarding the provided custom_metadata.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou merged pull request #13041: ARROW-16430: [Python] Add support for reading record batch custom metadata API

Posted by GitBox <gi...@apache.org>.
pitrou merged PR #13041:
URL: https://github.com/apache/arrow/pull/13041


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] niyue commented on a diff in pull request #13041: ARROW-16430: [Python] Add support for reading record batch custom metadata API

Posted by GitBox <gi...@apache.org>.
niyue commented on code in PR #13041:
URL: https://github.com/apache/arrow/pull/13041#discussion_r870974565


##########
python/pyarrow/tests/test_flight.py:
##########
@@ -2027,6 +2027,27 @@ def test_large_descriptor():
             client.do_exchange(large_descriptor)
 
 
+def test_write_batch_custom_metadata():
+    data = pa.Table.from_arrays([
+        pa.array(range(0, 10 * 1024))
+    ], names=["a"])
+    batches = data.to_batches()
+
+    with ExchangeFlightServer() as server, \
+            FlightClient(("localhost", server.port)) as client:
+        descriptor = flight.FlightDescriptor.for_command(b"put")
+        writer, reader = client.do_exchange(descriptor)
+        with writer:
+            writer.begin(data.schema)
+            for i, batch in enumerate(batches):
+                writer.write_batch(batch, {"batch_id": str(i)})
+            writer.done_writing()
+            chunk = reader.read_chunk()
+            assert chunk.data is None
+            expected_buf = str(len(batches)).encode("utf-8")
+            assert chunk.app_metadata == expected_buf

Review Comment:
   You are correct here I didn't verify the metadata was received. I really should point this out earlier. 
   
   Initially, I don't want to change the API in arrow flight because it is not that relevant for this PR, but Cython complained the number of arguments were incorrect in one of arrow flight's usage after I added an overloaded `WriteRecordBatch` API to the `CRecordBatchWriter`, so I made a slight change to the `write_batch` API in arrow flight implementation, and I would like to write a test case to cover this change but I didn't want to make too much change about other readers in C++ in this PR so I wasn't able to add more assertion in this test case.
   
   Now I rolled back the API change to the arrow flight's `write_batch` and removed this test case completely. I am not very familiar with arrow flight's API about this part yet and may have to submit other PR later if needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] niyue commented on a diff in pull request #13041: ARROW-16430: [Python] Add support for reading record batch custom metadata API

Posted by GitBox <gi...@apache.org>.
niyue commented on code in PR #13041:
URL: https://github.com/apache/arrow/pull/13041#discussion_r870976365


##########
python/pyarrow/_flight.pyx:
##########
@@ -977,22 +977,28 @@ cdef class MetadataRecordBatchWriter(_CRecordBatchWriter):
             check_flight_status(
                 self._writer().WriteMetadata(c_buf))
 
-    def write_batch(self, RecordBatch batch):
+    def write_batch(self, RecordBatch batch, custom_metadata=None):
         """
         Write RecordBatch to stream.
 
         Parameters
         ----------
         batch : RecordBatch
         """
+
         # Override superclass method to use check_flight_status so we
         # can generate FlightWriteSizeExceededError. We don't do this
         # for write_table as callers who intend to handle the error
         # and retry with a smaller batch should be working with
         # individual batches to have control.
+
+        if not (custom_metadata is None or isinstance(custom_metadata, KeyValueMetadata)):
+            custom_metadata = KeyValueMetadata(custom_metadata)

Review Comment:
   Thanks for pointing it out, Cython is still pretty new to me, and I will try using import/include next time for doing it.
   
   To make the change in the PR not too much, now I rolled back this change about this part.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] milesgranger commented on pull request #13041: ARROW-16430: [Python] Add support for reading record batch custom metadata API

Posted by GitBox <gi...@apache.org>.
milesgranger commented on PR #13041:
URL: https://github.com/apache/arrow/pull/13041#issuecomment-1209154378

   I can certainly pick it up if needed! 
   Presently, I'm working locally on [ARROW-13763](https://issues.apache.org/jira/browse/ARROW-13763).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #13041: ARROW-16430: [Python] Add support for reading record batch custom metadata API

Posted by GitBox <gi...@apache.org>.
pitrou commented on code in PR #13041:
URL: https://github.com/apache/arrow/pull/13041#discussion_r977447182


##########
python/pyarrow/ipc.pxi:
##########
@@ -832,6 +862,26 @@ cdef class _RecordBatchFileWriter(_RecordBatchStreamWriter):
             self.writer = GetResultValue(
                 MakeFileWriter(c_sink, schema.sp_schema, self.options))
 
+_RecordBatchWithMetadata = namedtuple(
+    'RecordBatchWithMetadata',
+    ('batch', 'custom_metadata'))
+
+
+class RecordBatchWithMetadata(_RecordBatchWithMetadata):
+    """RecordBatch with its custom metadata
+
+    Parameters
+    ----------
+    batch: record batch
+    custom_metadata: record batch's custom metadata
+    """
+    __slots__ = ()
+
+
+@staticmethod
+cdef _wrap_record_batch_with_metadata(CRecordBatchWithMetadata c):
+    return RecordBatchWithMetadata(pyarrow_wrap_batch(c.batch), pyarrow_wrap_metadata(c.custom_metadata))

Review Comment:
   Let's try to avoid long lines:
   ```suggestion
       return RecordBatchWithMetadata(pyarrow_wrap_batch(c.batch),
                                      pyarrow_wrap_metadata(c.custom_metadata))
   ```



##########
python/pyarrow/tests/test_ipc.py:
##########
@@ -945,6 +945,62 @@ def test_ipc_zero_copy_numpy():
     assert_frame_equal(df, rdf)
 
 
+@pytest.mark.pandas
+def test_ipc_batch_with_custom_metadata_roundtrip():
+    df = pd.DataFrame({'foo': [1.5]})
+
+    batch = pa.RecordBatch.from_pandas(df)
+    sink = pa.BufferOutputStream()
+
+    batch_count = 2
+    with pa.ipc.new_file(sink, batch.schema) as writer:
+        for i in range(batch_count):
+            writer.write_batch(batch, {"batch_id": str(i)})
+
+    buffer = sink.getvalue()
+    source = pa.BufferReader(buffer)
+
+    with pa.ipc.open_file(source) as reader:
+        batch_with_metas = [reader.get_batch_with_custom_metadata(
+            i) for i in range(reader.num_record_batches)]
+
+    for i in range(batch_count):
+        assert batch_with_metas[i].batch.num_rows == 1
+        assert batch_with_metas[i].custom_metadata == {"batch_id": str(i)}

Review Comment:
   Perhaps also `assert isinstance(batch_with_metas[i].custom_metadata, pa.KeyValueMetadata)`?



##########
python/pyarrow/ipc.pxi:
##########
@@ -832,6 +862,26 @@ cdef class _RecordBatchFileWriter(_RecordBatchStreamWriter):
             self.writer = GetResultValue(
                 MakeFileWriter(c_sink, schema.sp_schema, self.options))
 
+_RecordBatchWithMetadata = namedtuple(
+    'RecordBatchWithMetadata',
+    ('batch', 'custom_metadata'))
+
+
+class RecordBatchWithMetadata(_RecordBatchWithMetadata):
+    """RecordBatch with its custom metadata
+
+    Parameters
+    ----------
+    batch: record batch
+    custom_metadata: record batch's custom metadata

Review Comment:
   ```suggestion
       batch: RecordBatch
       custom_metadata: KeyValueMetadata
   ```



##########
python/pyarrow/ipc.pxi:
##########
@@ -908,6 +958,32 @@ cdef class _RecordBatchFileReader(_Weakrefable):
     # time has passed
     get_record_batch = get_batch
 
+    def get_batch_with_custom_metadata(self, int i):
+        """
+        Read the record batch with the given index along with its custom metadata

Review Comment:
   ```suggestion
           Read the record batch with the given index along with
           its custom metadata.
   ```



##########
python/pyarrow/tests/test_ipc.py:
##########
@@ -945,6 +945,62 @@ def test_ipc_zero_copy_numpy():
     assert_frame_equal(df, rdf)
 
 
+@pytest.mark.pandas
+def test_ipc_batch_with_custom_metadata_roundtrip():
+    df = pd.DataFrame({'foo': [1.5]})
+
+    batch = pa.RecordBatch.from_pandas(df)
+    sink = pa.BufferOutputStream()
+
+    batch_count = 2
+    with pa.ipc.new_file(sink, batch.schema) as writer:
+        for i in range(batch_count):
+            writer.write_batch(batch, {"batch_id": str(i)})

Review Comment:
   Can you add a batch with no metadata?



##########
python/pyarrow/ipc.pxi:
##########
@@ -687,6 +692,31 @@ cdef class RecordBatchReader(_Weakrefable):
 
         return pyarrow_wrap_batch(batch)
 
+    def read_next_batch_with_custom_metadata(self):
+        """
+        Read next RecordBatch from the stream along with its custom metadata.
+
+        Raises
+        ------
+        StopIteration:
+            At end of stream.
+
+        Returns
+        -------
+        batch : RecordBatch
+        custom_metadata : KeyValueMetadata or dict

Review Comment:
   ```suggestion
           custom_metadata : KeyValueMetadata
   ```



##########
python/pyarrow/ipc.pxi:
##########
@@ -908,6 +958,32 @@ cdef class _RecordBatchFileReader(_Weakrefable):
     # time has passed
     get_record_batch = get_batch
 
+    def get_batch_with_custom_metadata(self, int i):
+        """
+        Read the record batch with the given index along with its custom metadata
+
+        Parameters
+        ----------
+        i : int
+            The index of the record batch in the IPC file.
+
+        Returns
+        -------
+        batch : RecordBatch
+        custom_metadata : KeyValueMetadata or dict

Review Comment:
   ```suggestion
           custom_metadata : KeyValueMetadata
   ```



##########
python/pyarrow/tests/test_ipc.py:
##########
@@ -945,6 +945,62 @@ def test_ipc_zero_copy_numpy():
     assert_frame_equal(df, rdf)
 
 
+@pytest.mark.pandas
+def test_ipc_batch_with_custom_metadata_roundtrip():
+    df = pd.DataFrame({'foo': [1.5]})
+
+    batch = pa.RecordBatch.from_pandas(df)
+    sink = pa.BufferOutputStream()
+
+    batch_count = 2
+    with pa.ipc.new_file(sink, batch.schema) as writer:
+        for i in range(batch_count):
+            writer.write_batch(batch, {"batch_id": str(i)})
+
+    buffer = sink.getvalue()
+    source = pa.BufferReader(buffer)
+
+    with pa.ipc.open_file(source) as reader:
+        batch_with_metas = [reader.get_batch_with_custom_metadata(
+            i) for i in range(reader.num_record_batches)]
+
+    for i in range(batch_count):
+        assert batch_with_metas[i].batch.num_rows == 1
+        assert batch_with_metas[i].custom_metadata == {"batch_id": str(i)}
+
+
+def batch_with_meta_generator(reader):
+    while True:
+        try:
+            yield reader.read_next_batch_with_custom_metadata()
+        except StopIteration:
+            return
+
+
+@pytest.mark.pandas
+def test_record_batch_reader_with_custom_metadata_roundtrip():
+    df = pd.DataFrame({'foo': [1.5]})
+
+    batch = pa.RecordBatch.from_pandas(df)
+    sink = pa.BufferOutputStream()
+
+    batch_count = 2
+    with pa.ipc.new_stream(sink, batch.schema) as writer:
+        for i in range(batch_count):
+            writer.write_batch(batch, {"batch_id": str(i)})

Review Comment:
   Can you add a batch with no metadata?



##########
python/pyarrow/_flight.pyx:
##########
@@ -986,14 +986,18 @@ cdef class MetadataRecordBatchWriter(_CRecordBatchWriter):
         ----------
         batch : RecordBatch
         """
+        cdef:
+            shared_ptr[const CKeyValueMetadata] custom_metadata

Review Comment:
   Do we want to allow passing the custom_metadata as an argument to `write_batch` here?



##########
python/pyarrow/tests/test_ipc.py:
##########
@@ -945,6 +945,62 @@ def test_ipc_zero_copy_numpy():
     assert_frame_equal(df, rdf)
 
 
+@pytest.mark.pandas
+def test_ipc_batch_with_custom_metadata_roundtrip():
+    df = pd.DataFrame({'foo': [1.5]})
+
+    batch = pa.RecordBatch.from_pandas(df)
+    sink = pa.BufferOutputStream()
+
+    batch_count = 2
+    with pa.ipc.new_file(sink, batch.schema) as writer:
+        for i in range(batch_count):
+            writer.write_batch(batch, {"batch_id": str(i)})
+
+    buffer = sink.getvalue()
+    source = pa.BufferReader(buffer)
+
+    with pa.ipc.open_file(source) as reader:
+        batch_with_metas = [reader.get_batch_with_custom_metadata(
+            i) for i in range(reader.num_record_batches)]
+
+    for i in range(batch_count):
+        assert batch_with_metas[i].batch.num_rows == 1
+        assert batch_with_metas[i].custom_metadata == {"batch_id": str(i)}
+
+
+def batch_with_meta_generator(reader):

Review Comment:
   Perhaps it would be useful to expose this as `RecordBatchReader.iter_batches_with_custom_metadata`?



##########
python/pyarrow/ipc.pxi:
##########
@@ -471,17 +471,22 @@ cdef class _CRecordBatchWriter(_Weakrefable):
         else:
             raise ValueError(type(table_or_batch))
 
-    def write_batch(self, RecordBatch batch):
+    def write_batch(self, RecordBatch batch, custom_metadata=None):
         """
         Write RecordBatch to stream.
 
         Parameters
         ----------
         batch : RecordBatch
+        custom_metadata : dict

Review Comment:
   I see you forgot to add this suggestion?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] niyue commented on a diff in pull request #13041: ARROW-16430: [Python] Add support for reading record batch custom metadata API

Posted by GitBox <gi...@apache.org>.
niyue commented on code in PR #13041:
URL: https://github.com/apache/arrow/pull/13041#discussion_r980795053


##########
python/pyarrow/tests/test_ipc.py:
##########
@@ -945,6 +945,62 @@ def test_ipc_zero_copy_numpy():
     assert_frame_equal(df, rdf)
 
 
+@pytest.mark.pandas
+def test_ipc_batch_with_custom_metadata_roundtrip():
+    df = pd.DataFrame({'foo': [1.5]})
+
+    batch = pa.RecordBatch.from_pandas(df)
+    sink = pa.BufferOutputStream()
+
+    batch_count = 2
+    with pa.ipc.new_file(sink, batch.schema) as writer:
+        for i in range(batch_count):
+            writer.write_batch(batch, {"batch_id": str(i)})
+
+    buffer = sink.getvalue()
+    source = pa.BufferReader(buffer)
+
+    with pa.ipc.open_file(source) as reader:
+        batch_with_metas = [reader.get_batch_with_custom_metadata(
+            i) for i in range(reader.num_record_batches)]
+
+    for i in range(batch_count):
+        assert batch_with_metas[i].batch.num_rows == 1
+        assert batch_with_metas[i].custom_metadata == {"batch_id": str(i)}

Review Comment:
   Assertion added.



##########
python/pyarrow/ipc.pxi:
##########
@@ -908,6 +958,32 @@ cdef class _RecordBatchFileReader(_Weakrefable):
     # time has passed
     get_record_batch = get_batch
 
+    def get_batch_with_custom_metadata(self, int i):
+        """
+        Read the record batch with the given index along with its custom metadata
+
+        Parameters
+        ----------
+        i : int
+            The index of the record batch in the IPC file.
+
+        Returns
+        -------
+        batch : RecordBatch
+        custom_metadata : KeyValueMetadata or dict

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] niyue commented on a diff in pull request #13041: ARROW-16430: [Python] Add support for reading record batch custom metadata API

Posted by GitBox <gi...@apache.org>.
niyue commented on code in PR #13041:
URL: https://github.com/apache/arrow/pull/13041#discussion_r980793088


##########
python/pyarrow/tests/test_ipc.py:
##########
@@ -945,6 +945,62 @@ def test_ipc_zero_copy_numpy():
     assert_frame_equal(df, rdf)
 
 
+@pytest.mark.pandas
+def test_ipc_batch_with_custom_metadata_roundtrip():
+    df = pd.DataFrame({'foo': [1.5]})
+
+    batch = pa.RecordBatch.from_pandas(df)
+    sink = pa.BufferOutputStream()
+
+    batch_count = 2
+    with pa.ipc.new_file(sink, batch.schema) as writer:
+        for i in range(batch_count):
+            writer.write_batch(batch, {"batch_id": str(i)})
+
+    buffer = sink.getvalue()
+    source = pa.BufferReader(buffer)
+
+    with pa.ipc.open_file(source) as reader:
+        batch_with_metas = [reader.get_batch_with_custom_metadata(
+            i) for i in range(reader.num_record_batches)]
+
+    for i in range(batch_count):
+        assert batch_with_metas[i].batch.num_rows == 1
+        assert batch_with_metas[i].custom_metadata == {"batch_id": str(i)}
+
+
+def batch_with_meta_generator(reader):
+    while True:
+        try:
+            yield reader.read_next_batch_with_custom_metadata()
+        except StopIteration:
+            return
+
+
+@pytest.mark.pandas
+def test_record_batch_reader_with_custom_metadata_roundtrip():
+    df = pd.DataFrame({'foo': [1.5]})
+
+    batch = pa.RecordBatch.from_pandas(df)
+    sink = pa.BufferOutputStream()
+
+    batch_count = 2
+    with pa.ipc.new_stream(sink, batch.schema) as writer:
+        for i in range(batch_count):
+            writer.write_batch(batch, {"batch_id": str(i)})

Review Comment:
   Add one more batch without metadata in the test case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on pull request #13041: ARROW-16430: [Python] Add support for reading record batch custom metadata API

Posted by GitBox <gi...@apache.org>.
pitrou commented on PR #13041:
URL: https://github.com/apache/arrow/pull/13041#issuecomment-1209135470

   @niyue Are you planning to work on this?
   Otherwise @milesgranger you might be interested in taking this up?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] niyue commented on a diff in pull request #13041: ARROW-16430, support reading record batch custom metadata API in pyarrow

Posted by GitBox <gi...@apache.org>.
niyue commented on code in PR #13041:
URL: https://github.com/apache/arrow/pull/13041#discussion_r862431242


##########
python/pyarrow/includes/libarrow.pxd:
##########
@@ -794,6 +794,10 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
         shared_ptr[CRecordBatch] Slice(int64_t offset)
         shared_ptr[CRecordBatch] Slice(int64_t offset, int64_t length)
 
+    cdef cppclass CRecordBatchWithMetadata" arrow::RecordBatchWithMetadata":
+        shared_ptr[CRecordBatch] batch
+        const shared_ptr[const CKeyValueMetadata] custom_metadata

Review Comment:
   In C++, there is no `const` qualifier for this field. However, Cython reports some const assignment failure due to this, I read the code, and found several similar TODO items about this part:
   https://github.com/apache/arrow/blob/2a2c0873b3ae62d2d08225cc88e53ae004864a7f/python/pyarrow/includes/libarrow.pxd#L415, https://github.com/apache/arrow/blob/2a2c0873b3ae62d2d08225cc88e53ae004864a7f/python/pyarrow/includes/libarrow.pxd#L462
   
   I am not familiar with Cython at all, and have to add the `const` qualifier to make it work at first. Please advise how this could be revised. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] niyue commented on a diff in pull request #13041: ARROW-16430: [Python] Add support for reading record batch custom metadata API

Posted by GitBox <gi...@apache.org>.
niyue commented on code in PR #13041:
URL: https://github.com/apache/arrow/pull/13041#discussion_r870968050


##########
cpp/src/arrow/ipc/writer.h:
##########
@@ -104,8 +104,7 @@ class ARROW_EXPORT RecordBatchWriter {
   virtual Status WriteRecordBatch(
       const RecordBatch& batch,
       const std::shared_ptr<const KeyValueMetadata>& custom_metadata) {
-    return Status::NotImplemented(
-        "Write record batch with custom metadata not implemented");
+    return WriteRecordBatch(batch);

Review Comment:
   Change the default implementation according to suggestion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on pull request #13041: ARROW-16430: [Python] Add support for reading record batch custom metadata API

Posted by GitBox <gi...@apache.org>.
pitrou commented on PR #13041:
URL: https://github.com/apache/arrow/pull/13041#issuecomment-1130198351

   @niyue Sorry for the late replay, but the C++ `RecordBatchReader` already has a method `Result<RecordBatchWithMetadata> ReadNext()`, so you wouldn't need to add anything more to the C++ side AFAICT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] niyue commented on a diff in pull request #13041: ARROW-16430: [Python] Add support for reading record batch custom metadata API

Posted by GitBox <gi...@apache.org>.
niyue commented on code in PR #13041:
URL: https://github.com/apache/arrow/pull/13041#discussion_r980794871


##########
python/pyarrow/ipc.pxi:
##########
@@ -687,6 +692,31 @@ cdef class RecordBatchReader(_Weakrefable):
 
         return pyarrow_wrap_batch(batch)
 
+    def read_next_batch_with_custom_metadata(self):
+        """
+        Read next RecordBatch from the stream along with its custom metadata.
+
+        Raises
+        ------
+        StopIteration:
+            At end of stream.
+
+        Returns
+        -------
+        batch : RecordBatch
+        custom_metadata : KeyValueMetadata or dict

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] niyue commented on pull request #13041: ARROW-16430: [Python] Add support for reading record batch custom metadata API

Posted by GitBox <gi...@apache.org>.
niyue commented on PR #13041:
URL: https://github.com/apache/arrow/pull/13041#issuecomment-1272829741

   @pitrou sorry for the late response. There are two failed CI builds, and they don't seem relevant in this issue. I rebased onto the latest master branch anyway.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on pull request #13041: ARROW-16430: [Python] Add support for reading record batch custom metadata API

Posted by GitBox <gi...@apache.org>.
pitrou commented on PR #13041:
URL: https://github.com/apache/arrow/pull/13041#issuecomment-1243869123

   @niyue Sorry, I had overlooked it. I'll take a look when I can. @jorisvandenbossche would you like to review this too?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] niyue commented on pull request #13041: ARROW-16430: [Python] Add support for reading record batch custom metadata API

Posted by GitBox <gi...@apache.org>.
niyue commented on PR #13041:
URL: https://github.com/apache/arrow/pull/13041#issuecomment-1128343180

   @pitrou I made some additional changes to the PR, could you please help to review? Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #13041: ARROW-16430: [Python] Add support for reading record batch custom metadata API

Posted by GitBox <gi...@apache.org>.
pitrou commented on code in PR #13041:
URL: https://github.com/apache/arrow/pull/13041#discussion_r869084288


##########
python/pyarrow/_flight.pyx:
##########
@@ -977,22 +977,28 @@ cdef class MetadataRecordBatchWriter(_CRecordBatchWriter):
             check_flight_status(
                 self._writer().WriteMetadata(c_buf))
 
-    def write_batch(self, RecordBatch batch):
+    def write_batch(self, RecordBatch batch, custom_metadata=None):
         """
         Write RecordBatch to stream.
 
         Parameters
         ----------
         batch : RecordBatch
         """
+
         # Override superclass method to use check_flight_status so we
         # can generate FlightWriteSizeExceededError. We don't do this
         # for write_table as callers who intend to handle the error
         # and retry with a smaller batch should be working with
         # individual batches to have control.
+
+        if not (custom_metadata is None or isinstance(custom_metadata, KeyValueMetadata)):
+            custom_metadata = KeyValueMetadata(custom_metadata)

Review Comment:
   I think you can use `ensure_metadata` to abstract these checks away.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] niyue commented on pull request #13041: ARROW-16430: [Python] Add support for reading record batch custom metadata API

Posted by GitBox <gi...@apache.org>.
niyue commented on PR #13041:
URL: https://github.com/apache/arrow/pull/13041#issuecomment-1255667570

   No problem. I will check them out and see if I can get them addressed. Thanks for the review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] niyue commented on a diff in pull request #13041: ARROW-16430: [Python] Add support for reading record batch custom metadata API

Posted by GitBox <gi...@apache.org>.
niyue commented on code in PR #13041:
URL: https://github.com/apache/arrow/pull/13041#discussion_r980803518


##########
python/pyarrow/_flight.pyx:
##########
@@ -986,14 +986,18 @@ cdef class MetadataRecordBatchWriter(_CRecordBatchWriter):
         ----------
         batch : RecordBatch
         """
+        cdef:
+            shared_ptr[const CKeyValueMetadata] custom_metadata

Review Comment:
   I am not very familiar with arrow flight API, previously I want to enhance `write_batch` here by allowing `custom_metadata` as a param, however, I find there is already an API called `write_with_metadata` (in `_flight.pyx` as well), so I am not sure if it is the right thing to do. So far this `custom_metadata` in this API is always null. @pitrou  Do you have any suggestion here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] niyue commented on pull request #13041: ARROW-16430: [Python] Add support for reading record batch custom metadata API

Posted by GitBox <gi...@apache.org>.
niyue commented on PR #13041:
URL: https://github.com/apache/arrow/pull/13041#issuecomment-1299871601

   @pitrou could you please help to review the PR to see if there is still anything we desire to change? Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] niyue commented on pull request #13041: ARROW-16430: [Python] Add support for reading record batch custom metadata API

Posted by GitBox <gi...@apache.org>.
niyue commented on PR #13041:
URL: https://github.com/apache/arrow/pull/13041#issuecomment-1347744879

   No problem @pitrou . Thanks for the review and the fixes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on pull request #13041: ARROW-16430: [Python] Add support for reading record batch custom metadata API

Posted by GitBox <gi...@apache.org>.
pitrou commented on PR #13041:
URL: https://github.com/apache/arrow/pull/13041#issuecomment-1217800304

   @niyue If/when this is ready for review, please say so :-)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] niyue commented on a diff in pull request #13041: ARROW-16430, support reading record batch custom metadata API in pyarrow

Posted by GitBox <gi...@apache.org>.
niyue commented on code in PR #13041:
URL: https://github.com/apache/arrow/pull/13041#discussion_r862431242


##########
python/pyarrow/includes/libarrow.pxd:
##########
@@ -794,6 +794,10 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
         shared_ptr[CRecordBatch] Slice(int64_t offset)
         shared_ptr[CRecordBatch] Slice(int64_t offset, int64_t length)
 
+    cdef cppclass CRecordBatchWithMetadata" arrow::RecordBatchWithMetadata":
+        shared_ptr[CRecordBatch] batch
+        const shared_ptr[const CKeyValueMetadata] custom_metadata

Review Comment:
   In C++, there is no `const` qualifier for this field. However, Cython reports some const assignment failure due to this, I read the code, and found several similar TODO items (https://github.com/apache/arrow/blob/2a2c0873b3ae62d2d08225cc88e53ae004864a7f/python/pyarrow/includes/libarrow.pxd#L415, https://github.com/apache/arrow/blob/2a2c0873b3ae62d2d08225cc88e53ae004864a7f/python/pyarrow/includes/libarrow.pxd#L462) about this part. 
   
   I am not familiar with Cython at all, and have to add the `const` qualifier to make it work at first. Please advise how this could be revised. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] github-actions[bot] commented on pull request #13041: ARROW-16430, support reading record batch custom metadata API in pyarrow

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #13041:
URL: https://github.com/apache/arrow/pull/13041#issuecomment-1114147489

   https://issues.apache.org/jira/browse/ARROW-16430


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #13041: ARROW-16430: [Python] Add support for reading record batch custom metadata API

Posted by GitBox <gi...@apache.org>.
pitrou commented on code in PR #13041:
URL: https://github.com/apache/arrow/pull/13041#discussion_r869085637


##########
python/pyarrow/ipc.pxi:
##########
@@ -900,6 +925,31 @@ cdef class _RecordBatchFileReader(_Weakrefable):
     # time has passed
     get_record_batch = get_batch
 
+    def get_batch_with_custom_metadata(self, int i):
+        """
+        Read the record batch with the given index along with its custom metadata
+
+        Parameters
+        ----------
+        i : int
+            The index of the record batch in the IPC file.
+
+        Returns
+        -------
+        batch : RecordBatch
+        custom_metadata: dict

Review Comment:
   @jorisvandenbossche Is this the right Numpydoc syntax when a namedtuple is returned?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #13041: ARROW-16430, support reading record batch custom metadata API in pyarrow

Posted by GitBox <gi...@apache.org>.
pitrou commented on code in PR #13041:
URL: https://github.com/apache/arrow/pull/13041#discussion_r864590676


##########
python/pyarrow/includes/libarrow.pxd:
##########
@@ -794,6 +794,10 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
         shared_ptr[CRecordBatch] Slice(int64_t offset)
         shared_ptr[CRecordBatch] Slice(int64_t offset, int64_t length)
 
+    cdef cppclass CRecordBatchWithMetadata" arrow::RecordBatchWithMetadata":
+        shared_ptr[CRecordBatch] batch
+        const shared_ptr[const CKeyValueMetadata] custom_metadata

Review Comment:
   I think it's ok to add the `const`. You can add a comment explaining this trick.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on a diff in pull request #13041: ARROW-16430: [Python] Add support for reading record batch custom metadata API

Posted by GitBox <gi...@apache.org>.
lidavidm commented on code in PR #13041:
URL: https://github.com/apache/arrow/pull/13041#discussion_r982750203


##########
python/pyarrow/_flight.pyx:
##########
@@ -986,14 +986,18 @@ cdef class MetadataRecordBatchWriter(_CRecordBatchWriter):
         ----------
         batch : RecordBatch
         """
+        cdef:
+            shared_ptr[const CKeyValueMetadata] custom_metadata

Review Comment:
   Flight has a separate concept of metadata (I think they may have been added around the same time?) so it may make sense to expose both - I think there have been requests for that in the past. That said I think it's ok to punt to a separate PR as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] niyue commented on a diff in pull request #13041: ARROW-16430: [Python] Add support for reading record batch custom metadata API

Posted by GitBox <gi...@apache.org>.
niyue commented on code in PR #13041:
URL: https://github.com/apache/arrow/pull/13041#discussion_r980795792


##########
python/pyarrow/ipc.pxi:
##########
@@ -832,6 +862,26 @@ cdef class _RecordBatchFileWriter(_RecordBatchStreamWriter):
             self.writer = GetResultValue(
                 MakeFileWriter(c_sink, schema.sp_schema, self.options))
 
+_RecordBatchWithMetadata = namedtuple(
+    'RecordBatchWithMetadata',
+    ('batch', 'custom_metadata'))
+
+
+class RecordBatchWithMetadata(_RecordBatchWithMetadata):
+    """RecordBatch with its custom metadata
+
+    Parameters
+    ----------
+    batch: record batch
+    custom_metadata: record batch's custom metadata
+    """
+    __slots__ = ()
+
+
+@staticmethod
+cdef _wrap_record_batch_with_metadata(CRecordBatchWithMetadata c):
+    return RecordBatchWithMetadata(pyarrow_wrap_batch(c.batch), pyarrow_wrap_metadata(c.custom_metadata))

Review Comment:
   Fixed.



##########
python/pyarrow/ipc.pxi:
##########
@@ -832,6 +862,26 @@ cdef class _RecordBatchFileWriter(_RecordBatchStreamWriter):
             self.writer = GetResultValue(
                 MakeFileWriter(c_sink, schema.sp_schema, self.options))
 
+_RecordBatchWithMetadata = namedtuple(
+    'RecordBatchWithMetadata',
+    ('batch', 'custom_metadata'))
+
+
+class RecordBatchWithMetadata(_RecordBatchWithMetadata):
+    """RecordBatch with its custom metadata
+
+    Parameters
+    ----------
+    batch: record batch
+    custom_metadata: record batch's custom metadata

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] niyue commented on a diff in pull request #13041: ARROW-16430: [Python] Add support for reading record batch custom metadata API

Posted by GitBox <gi...@apache.org>.
niyue commented on code in PR #13041:
URL: https://github.com/apache/arrow/pull/13041#discussion_r1011406639


##########
python/pyarrow/ipc.pxi:
##########
@@ -471,17 +471,22 @@ cdef class _CRecordBatchWriter(_Weakrefable):
         else:
             raise ValueError(type(table_or_batch))
 
-    def write_batch(self, RecordBatch batch):
+    def write_batch(self, RecordBatch batch, custom_metadata=None):
         """
         Write RecordBatch to stream.
 
         Parameters
         ----------
         batch : RecordBatch
+        custom_metadata : dict

Review Comment:
   I change it to `KeyValueMetadata` like other parts.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org