You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/04/01 16:22:48 UTC

[GitHub] [spark] holdenk commented on a change in pull request #24070: [SPARK-23961][PYTHON] Fix error when toLocalIterator goes out of scope

holdenk commented on a change in pull request #24070: [SPARK-23961][PYTHON] Fix error when toLocalIterator goes out of scope
URL: https://github.com/apache/spark/pull/24070#discussion_r270946930
 
 

 ##########
 File path: python/pyspark/rdd.py
 ##########
 @@ -138,15 +138,59 @@ def _parse_memory(s):
     return int(float(s[:-1]) * units[s[-1].lower()])
 
 
-def _load_from_socket(sock_info, serializer):
+def _create_local_socket(sock_info):
     (sockfile, sock) = local_connect_and_auth(*sock_info)
-    # The RDD materialization time is unpredicable, if we set a timeout for socket reading
+    # The RDD materialization time is unpredictable, if we set a timeout for socket reading
     # operation, it will very possibly fail. See SPARK-18281.
     sock.settimeout(None)
+    return sockfile
+
+
+def _load_from_socket(sock_info, serializer):
+    sockfile = _create_local_socket(sock_info)
     # The socket will be automatically closed when garbage-collected.
     return serializer.load_stream(sockfile)
 
 
+def _local_iterator_from_socket(sock_info, serializer):
+
+    class PyLocalIterable(object):
+        """ Create a synchronous local iterable over a socket """
+
+        def __init__(self, _sock_info, _serializer):
+            self._sockfile = _create_local_socket(_sock_info)
+            self._serializer = _serializer
+            self._read_iter = iter([])  # Initialize as empty iterator
+
+        def __iter__(self):
+            while True:
+                # Request next partition data from Java
+                write_int(1, self._sockfile)
+                self._sockfile.flush()
+
+                # If nonzero response, then there is a partition to read
+                if read_int(self._sockfile) == 0:
+                    break
+
+                # Load the partition data as a stream and read each item
+                self._read_iter = self._serializer.load_stream(self._sockfile)
+                for item in self._read_iter:
+                    yield item
+
+        def __del__(self):
+            try:
+                # Finish consuming partition data stream
+                for _ in self._read_iter:
+                    pass
+                # Tell Java to stop sending data and close connection
 
 Review comment:
   Would it make sense to send this message before finishing consuming the data coming in?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org