You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/07/07 22:20:51 UTC

[beam] branch master updated: Fix Hadoop upload corrupted due to buffer reuse

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 35a51a1a411 Fix Hadoop upload corrupted due to buffer reuse
     new ddc0777c3c1 Merge pull request #22177 from Fix Hadoop upload corrupted due to buffer reuse
35a51a1a411 is described below

commit 35a51a1a411a428706323e5840b1ad90b54e6ec7
Author: Yi Hu <ya...@google.com>
AuthorDate: Wed Jul 6 15:17:45 2022 -0400

    Fix Hadoop upload corrupted due to buffer reuse
---
 sdks/python/apache_beam/io/hadoopfilesystem.py | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/sdks/python/apache_beam/io/hadoopfilesystem.py b/sdks/python/apache_beam/io/hadoopfilesystem.py
index 9a153c0984f..c47a66c0f10 100644
--- a/sdks/python/apache_beam/io/hadoopfilesystem.py
+++ b/sdks/python/apache_beam/io/hadoopfilesystem.py
@@ -86,7 +86,9 @@ class HdfsUploader(filesystemio.Uploader):
     self._handle = self._handle_context.__enter__()
 
   def put(self, data):
-    self._handle.write(data)
+    # hdfs uses an async writer which first add data to a queue. To avoid buffer
+    # gets reused upstream a deepcopy is required here.
+    self._handle.write(bytes(data))
 
   def finish(self):
     self._handle.__exit__(None, None, None)