You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by uw...@apache.org on 2016/12/09 05:50:09 UTC
arrow git commit: ARROW-404: [Python] Fix segfault caused by
HdfsClient getting closed before an HdfsFile
Repository: arrow
Updated Branches:
refs/heads/master c8eb49e41 -> e139b8b7c
ARROW-404: [Python] Fix segfault caused by HdfsClient getting closed before an HdfsFile
The one downside of this patch is that HdfsFile handles don't get garbage-collected until the cyclic GC runs -- I tried to fix this but couldn't get it working. So bytes don't always get flushed to HDFS until `close()` is called. The flush issue should be addressed on the C++ side
Author: Wes McKinney <we...@twosigma.com>
Closes #230 from wesm/ARROW-404 and squashes the following commits:
3a8e641 [Wes McKinney] Use weakref in _HdfsFileNanny to avoid cyclic gc
274d0c5 [Wes McKinney] amend comment
1539a2c [Wes McKinney] Ensure that HdfsClient does not get closed before an open file does when the last user-accessible client reference goes out of scope
Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/e139b8b7
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/e139b8b7
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/e139b8b7
Branch: refs/heads/master
Commit: e139b8b7c11b7f36fa57a625a39d9c8779d033f4
Parents: c8eb49e
Author: Wes McKinney <we...@twosigma.com>
Authored: Fri Dec 9 06:49:49 2016 +0100
Committer: Uwe L. Korn <uw...@xhochy.com>
Committed: Fri Dec 9 06:49:49 2016 +0100
----------------------------------------------------------------------
python/pyarrow/io.pyx | 86 ++++++++++++++++++++++------------
python/pyarrow/tests/test_hdfs.py | 23 +++++++++
2 files changed, 79 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/e139b8b7/python/pyarrow/io.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/io.pyx b/python/pyarrow/io.pyx
index 0e6b81e..2fa5fb6 100644
--- a/python/pyarrow/io.pyx
+++ b/python/pyarrow/io.pyx
@@ -504,7 +504,7 @@ cdef class HdfsClient:
out.mode = mode
out.buffer_size = c_buffer_size
- out.parent = self
+ out.parent = _HdfsFileNanny(self, out)
out.is_open = True
out.own_file = True
@@ -516,48 +516,69 @@ cdef class HdfsClient:
"""
write_queue = Queue(50)
- f = self.open(path, 'wb')
+ with self.open(path, 'wb') as f:
+ done = False
+ exc_info = None
+ def bg_write():
+ try:
+ while not done or write_queue.qsize() > 0:
+ try:
+ buf = write_queue.get(timeout=0.01)
+ except QueueEmpty:
+ continue
- done = False
- exc_info = None
- def bg_write():
- try:
- while not done or write_queue.qsize() > 0:
- try:
- buf = write_queue.get(timeout=0.01)
- except QueueEmpty:
- continue
+ f.write(buf)
- f.write(buf)
+ except Exception as e:
+ exc_info = sys.exc_info()
- except Exception as e:
- exc_info = sys.exc_info()
-
- writer_thread = threading.Thread(target=bg_write)
- writer_thread.start()
+ writer_thread = threading.Thread(target=bg_write)
+ writer_thread.start()
- try:
- while True:
- buf = stream.read(buffer_size)
- if not buf:
- break
+ try:
+ while True:
+ buf = stream.read(buffer_size)
+ if not buf:
+ break
- write_queue.put_nowait(buf)
- finally:
- done = True
+ write_queue.put_nowait(buf)
+ finally:
+ done = True
- writer_thread.join()
- if exc_info is not None:
- raise exc_info[0], exc_info[1], exc_info[2]
+ writer_thread.join()
+ if exc_info is not None:
+ raise exc_info[0], exc_info[1], exc_info[2]
def download(self, path, stream, buffer_size=None):
- f = self.open(path, 'rb', buffer_size=buffer_size)
- f.download(stream)
+ with self.open(path, 'rb', buffer_size=buffer_size) as f:
+ f.download(stream)
# ----------------------------------------------------------------------
# Specialization for HDFS
+# ARROW-404: Helper class to ensure that files are closed before the
+# client. During deallocation of the extension class, the attributes are
+# decref'd which can cause the client to get closed first if the file has the
+# last remaining reference
+cdef class _HdfsFileNanny:
+ cdef:
+ object client
+ object file_handle_ref
+
+ def __cinit__(self, client, file_handle):
+ import weakref
+ self.client = client
+ self.file_handle_ref = weakref.ref(file_handle)
+
+ def __dealloc__(self):
+ fh = self.file_handle_ref()
+ if fh:
+ fh.close()
+ # avoid cyclic GC
+ self.file_handle_ref = None
+ self.client = None
+
cdef class HdfsFile(NativeFile):
cdef readonly:
@@ -565,6 +586,11 @@ cdef class HdfsFile(NativeFile):
object mode
object parent
+ cdef object __weakref__
+
+ def __dealloc__(self):
+ self.parent = None
+
def read(self, int nbytes):
"""
Read indicated number of bytes from the file, up to EOF
http://git-wip-us.apache.org/repos/asf/arrow/blob/e139b8b7/python/pyarrow/tests/test_hdfs.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_hdfs.py b/python/pyarrow/tests/test_hdfs.py
index ed8d419..c23543b 100644
--- a/python/pyarrow/tests/test_hdfs.py
+++ b/python/pyarrow/tests/test_hdfs.py
@@ -98,6 +98,29 @@ def test_hdfs_ls(hdfs):
assert contents == [dir_path, f1_path]
+def _make_test_file(hdfs, test_name, test_path, test_data):
+ base_path = pjoin(HDFS_TMP_PATH, test_name)
+ hdfs.mkdir(base_path)
+
+ full_path = pjoin(base_path, test_path)
+
+ f = hdfs.open(full_path, 'wb')
+ f.write(test_data)
+
+ return full_path
+
+
+@libhdfs
+def test_hdfs_orphaned_file():
+ hdfs = hdfs_test_client()
+ file_path = _make_test_file(hdfs, 'orphaned_file_test', 'fname',
+ 'foobarbaz')
+
+ f = hdfs.open(file_path)
+ hdfs = None
+ f = None # noqa
+
+
@libhdfs
def test_hdfs_download_upload(hdfs):
base_path = pjoin(HDFS_TMP_PATH, 'upload-test')