You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by da...@apache.org on 2015/06/29 23:36:34 UTC

spark git commit: [SPARK-7810] [PYSPARK] solve python rdd socket connection problem

Repository: spark
Updated Branches:
  refs/heads/master f6fc254ec -> ecd3aacf2


[SPARK-7810] [PYSPARK] solve python rdd socket connection problem

Method "_load_from_socket" in rdd.py cannot load data from jvm socket when ipv6 is used. The current method only works well with ipv4. New modification should work around both two protocols.

Author: Ai He <ai...@ussuning.com>
Author: AiHe <ai...@ussuning.com>

Closes #6338 from AiHe/pyspark-networking-issue and squashes the following commits:

d4fc9c4 [Ai He] handle code review 2
e75c5c8 [Ai He] handle code review
5644953 [AiHe] solve python rdd socket connection problem to jvm


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ecd3aacf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ecd3aacf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ecd3aacf

Branch: refs/heads/master
Commit: ecd3aacf2805bb231cfb44bab079319cfe73c3f1
Parents: f6fc254
Author: Ai He <ai...@ussuning.com>
Authored: Mon Jun 29 14:36:26 2015 -0700
Committer: Davies Liu <da...@databricks.com>
Committed: Mon Jun 29 14:36:26 2015 -0700

----------------------------------------------------------------------
 python/pyspark/rdd.py | 18 +++++++++++++++---
 1 file changed, 15 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ecd3aacf/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 1b64be2..cb20bc8 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -121,10 +121,22 @@ def _parse_memory(s):
 
 
 def _load_from_socket(port, serializer):
-    sock = socket.socket()
-    sock.settimeout(3)
+    sock = None
+    # Support for both IPv4 and IPv6.
+    # On most of IPv6-ready systems, IPv6 will take precedence.
+    for res in socket.getaddrinfo("localhost", port, socket.AF_UNSPEC, socket.SOCK_STREAM):
+        af, socktype, proto, canonname, sa = res
+        try:
+            sock = socket.socket(af, socktype, proto)
+            sock.settimeout(3)
+            sock.connect(sa)
+        except socket.error:
+            sock = None
+            continue
+        break
+    if not sock:
+        raise Exception("could not open socket")
     try:
-        sock.connect(("localhost", port))
         rf = sock.makefile("rb", 65536)
         for item in serializer.load_stream(rf):
             yield item


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org