You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/07/07 22:20:51 UTC
[beam] branch master updated: Fix Hadoop upload corrupted due to buffer reuse
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 35a51a1a411 Fix Hadoop upload corrupted due to buffer reuse
new ddc0777c3c1 Merge pull request #22177 from Fix Hadoop upload corrupted due to buffer reuse
35a51a1a411 is described below
commit 35a51a1a411a428706323e5840b1ad90b54e6ec7
Author: Yi Hu <ya...@google.com>
AuthorDate: Wed Jul 6 15:17:45 2022 -0400
Fix Hadoop upload corrupted due to buffer reuse
---
sdks/python/apache_beam/io/hadoopfilesystem.py | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
diff --git a/sdks/python/apache_beam/io/hadoopfilesystem.py b/sdks/python/apache_beam/io/hadoopfilesystem.py
index 9a153c0984f..c47a66c0f10 100644
--- a/sdks/python/apache_beam/io/hadoopfilesystem.py
+++ b/sdks/python/apache_beam/io/hadoopfilesystem.py
@@ -86,7 +86,9 @@ class HdfsUploader(filesystemio.Uploader):
self._handle = self._handle_context.__enter__()
def put(self, data):
- self._handle.write(data)
+ # hdfs uses an async writer which first add data to a queue. To avoid buffer
+ # gets reused upstream a deepcopy is required here.
+ self._handle.write(bytes(data))
def finish(self):
self._handle.__exit__(None, None, None)