You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by jiangxb1987 <gi...@git.apache.org> on 2018/08/12 16:07:17 UTC

[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

GitHub user jiangxb1987 opened a pull request:

    https://github.com/apache/spark/pull/22085

    [SPARK-25095][PySpark] Python support for BarrierTaskContext

    ## What changes were proposed in this pull request?
    
    Add method `barrier()` and `getTaskInfos()` in python TaskContext, these two methods are only allowed for barrier tasks.
    
    ## How was this patch tested?
    
    TBD


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/jiangxb1987/spark python.barrier

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/22085.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #22085
    
----
commit 7b488299709f715d344e5c38956577f31718ab34
Author: Xingbo Jiang <xi...@...>
Date:   2018-08-12T16:04:20Z

    implement python barrier taskcontext

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22085#discussion_r211358743
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
    @@ -180,7 +190,61 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
             dataOut.writeInt(partitionIndex)
             // Python version of driver
             PythonRDD.writeUTF(pythonVer, dataOut)
    +        // Init a ServerSocket to accept method calls from Python side.
    +        val isBarrier = context.isInstanceOf[BarrierTaskContext]
    +        if (isBarrier) {
    +          serverSocket = Some(new ServerSocket(0, 1, InetAddress.getByName("localhost")))
    +          // A call to accept() for ServerSocket shall block infinitely.
    +          serverSocket.map(_.setSoTimeout(0))
    +          new Thread("accept-connections") {
    +            setDaemon(true)
    +
    +            override def run(): Unit = {
    +              while (!serverSocket.get.isClosed()) {
    +                var sock: Socket = null
    +                try {
    +                  sock = serverSocket.get.accept()
    +                  sock.setSoTimeout(10000)
    --- End diff --
    
    Should add a comment about this timeout.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22085#discussion_r211356245
  
    --- Diff: python/pyspark/taskcontext.py ---
    @@ -95,3 +99,126 @@ def getLocalProperty(self, key):
             Get a local property set upstream in the driver, or None if it is missing.
             """
             return self._localProperties.get(key, None)
    +
    +
    +def _load_from_socket(port, auth_secret):
    --- End diff --
    
    Should document how this is different from the one in `context.py`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

Posted by jiangxb1987 <gi...@git.apache.org>.
Github user jiangxb1987 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22085#discussion_r209490553
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
    @@ -180,7 +183,42 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
             dataOut.writeInt(partitionIndex)
             // Python version of driver
             PythonRDD.writeUTF(pythonVer, dataOut)
    +        // Init a GatewayServer to port current BarrierTaskContext to Python side.
    +        val isBarrier = context.isInstanceOf[BarrierTaskContext]
    +        val secret = if (isBarrier) {
    +          Utils.createSecret(env.conf)
    +        } else {
    +          ""
    +        }
    +        val gatewayServer: Option[GatewayServer] = if (isBarrier) {
    +          Some(new GatewayServer.GatewayServerBuilder()
    +            .entryPoint(context.asInstanceOf[BarrierTaskContext])
    +            .authToken(secret)
    +            .javaPort(0)
    +            .callbackClient(GatewayServer.DEFAULT_PYTHON_PORT, GatewayServer.defaultAddress(),
    +              secret)
    +            .build())
    --- End diff --
    
    We have to port `BarrierTaskContext` from java to python side, otherwise there is no way to call `BarrierTaskContext.barrier()` from python side. Thus, of course, the JavaGateway is only initiated when the context is a `BarrierTaskContext`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22085: [WIP][SPARK-25095][PySpark] Python support for BarrierTa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22085
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2284/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22085#discussion_r213043068
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
    @@ -180,7 +188,73 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
             dataOut.writeInt(partitionIndex)
             // Python version of driver
             PythonRDD.writeUTF(pythonVer, dataOut)
    +        // Init a ServerSocket to accept method calls from Python side.
    +        val isBarrier = context.isInstanceOf[BarrierTaskContext]
    +        if (isBarrier) {
    +          serverSocket = Some(new ServerSocket(/* port */ 0,
    +            /* backlog */ 1,
    +            InetAddress.getByName("localhost")))
    +          // A call to accept() for ServerSocket shall block infinitely.
    +          serverSocket.map(_.setSoTimeout(0))
    +          new Thread("accept-connections") {
    +            setDaemon(true)
    +
    +            override def run(): Unit = {
    +              while (!serverSocket.get.isClosed()) {
    +                var sock: Socket = null
    +                try {
    +                  sock = serverSocket.get.accept()
    +                  // Wait for function call from python side.
    +                  sock.setSoTimeout(10000)
    +                  val input = new DataInputStream(sock.getInputStream())
    --- End diff --
    
    (I'd also like to do some refactoring of the socket setup code in python, and that can go further if we do authenticaion first here)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22085#discussion_r209835246
  
    --- Diff: python/pyspark/taskcontext.py ---
    @@ -95,3 +95,92 @@ def getLocalProperty(self, key):
             Get a local property set upstream in the driver, or None if it is missing.
             """
             return self._localProperties.get(key, None)
    +
    +
    +class BarrierTaskContext(TaskContext):
    +
    +    """
    +    .. note:: Experimental
    +
    +    A TaskContext with extra info and tooling for a barrier stage. To access the BarrierTaskContext
    +    for a running task, use:
    +    L{BarrierTaskContext.get()}.
    +
    +    .. versionadded:: 2.4.0
    +    """
    +
    +    _barrierContext = None
    +
    +    def __init__(self):
    +        """Construct a BarrierTaskContext, use get instead"""
    +        pass
    --- End diff --
    
    I would throw an exception BTW if this method should rather be banned


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22085#discussion_r209464621
  
    --- Diff: python/pyspark/taskcontext.py ---
    @@ -29,6 +29,7 @@ class TaskContext(object):
         """
     
         _taskContext = None
    +    _javaContext = None
    --- End diff --
    
    `_barrierContext`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

Posted by jiangxb1987 <gi...@git.apache.org>.
Github user jiangxb1987 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22085#discussion_r211683369
  
    --- Diff: python/pyspark/taskcontext.py ---
    @@ -95,3 +99,126 @@ def getLocalProperty(self, key):
             Get a local property set upstream in the driver, or None if it is missing.
             """
             return self._localProperties.get(key, None)
    +
    +
    +def _load_from_socket(port, auth_secret):
    +    """
    +    Load data from a given socket, this is a blocking method thus only return when the socket
    +    connection has been closed.
    +    """
    +    sock = None
    +    # Support for both IPv4 and IPv6.
    +    # On most of IPv6-ready systems, IPv6 will take precedence.
    +    for res in socket.getaddrinfo("localhost", port, socket.AF_UNSPEC, socket.SOCK_STREAM):
    +        af, socktype, proto, canonname, sa = res
    +        sock = socket.socket(af, socktype, proto)
    +        try:
    +            # Do not allow timeout for socket reading operation.
    +            sock.settimeout(None)
    +            sock.connect(sa)
    +        except socket.error:
    +            sock.close()
    +            sock = None
    +            continue
    +        break
    +    if not sock:
    +        raise Exception("could not open socket")
    +
    +    sockfile = sock.makefile("rwb", 65536)
    +    write_with_length("run".encode("utf-8"), sockfile)
    +    sockfile.flush()
    +    do_server_auth(sockfile, auth_secret)
    +
    +    # The socket will be automatically closed when garbage-collected.
    +    return UTF8Deserializer().loads(sockfile)
    +
    +
    +class BarrierTaskContext(TaskContext):
    +
    +    """
    +    .. note:: Experimental
    +
    +    A TaskContext with extra info and tooling for a barrier stage. To access the BarrierTaskContext
    +    for a running task, use:
    +    L{BarrierTaskContext.get()}.
    +
    +    .. versionadded:: 2.4.0
    +    """
    +
    +    _port = None
    +    _secret = None
    +
    +    def __init__(self):
    +        """Construct a BarrierTaskContext, use get instead"""
    +        pass
    +
    +    @classmethod
    +    def _getOrCreate(cls):
    +        """Internal function to get or create global BarrierTaskContext."""
    +        if cls._taskContext is None:
    --- End diff --
    
    IIUC reuse python worker just means we start a python worker from a daemon thread, it shall not affect the input/output files related to worker.py.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22085#discussion_r209491244
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
    @@ -180,7 +183,42 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
             dataOut.writeInt(partitionIndex)
             // Python version of driver
             PythonRDD.writeUTF(pythonVer, dataOut)
    +        // Init a GatewayServer to port current BarrierTaskContext to Python side.
    +        val isBarrier = context.isInstanceOf[BarrierTaskContext]
    +        val secret = if (isBarrier) {
    +          Utils.createSecret(env.conf)
    +        } else {
    +          ""
    +        }
    +        val gatewayServer: Option[GatewayServer] = if (isBarrier) {
    +          Some(new GatewayServer.GatewayServerBuilder()
    +            .entryPoint(context.asInstanceOf[BarrierTaskContext])
    +            .authToken(secret)
    +            .javaPort(0)
    +            .callbackClient(GatewayServer.DEFAULT_PYTHON_PORT, GatewayServer.defaultAddress(),
    +              secret)
    +            .build())
    --- End diff --
    
    If this should necessarily target 2.4.0, don't block by me since it's a new feature and probably we could consider another approach later but if we can avoid, I would suggest to avoid for now. 
    
    Let me try to track the design doc and changes about this. I think I need more time to check why it happened like this and if there's another way.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22085: [SPARK-25095][PySpark] Python support for BarrierTaskCon...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22085
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94681/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22085: [SPARK-25095][PySpark] Python support for BarrierTaskCon...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22085
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22085: [SPARK-25095][PySpark] Python support for BarrierTaskCon...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22085
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22085#discussion_r209473919
  
    --- Diff: python/pyspark/taskcontext.py ---
    @@ -95,3 +96,33 @@ def getLocalProperty(self, key):
             Get a local property set upstream in the driver, or None if it is missing.
             """
             return self._localProperties.get(key, None)
    +
    +    def barrier(self):
    +        """
    +        .. note:: Experimental
    +
    +        Sets a global barrier and waits until all tasks in this stage hit this barrier.
    +        Note this method is only allowed for a BarrierTaskContext.
    +
    +        .. versionadded:: 2.4.0
    +        """
    +        if self._javaContext is None:
    +            raise Exception("Not supported to call barrier() inside a non-barrier task.")
    +        else:
    +            self._javaContext.barrier()
    +
    +    def getTaskInfos(self):
    +        """
    +        .. note:: Experimental
    +
    +        Returns the all task infos in this barrier stage, the task infos are ordered by
    +        partitionId.
    +        Note this method is only allowed for a BarrierTaskContext.
    +
    +        .. versionadded:: 2.4.0
    +        """
    +        if self._javaContext is None:
    +            raise Exception("Not supported to call getTaskInfos() inside a non-barrier task.")
    +        else:
    +            java_list = self._javaContext.getTaskInfos()
    +            return [h for h in java_list]
    --- End diff --
    
    Create `BarrierTaskInfo` class and wrap it over Java object.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22085: [WIP][SPARK-25095][PySpark] Python support for BarrierTa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22085
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22085#discussion_r209464515
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
    @@ -180,7 +183,42 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
             dataOut.writeInt(partitionIndex)
             // Python version of driver
             PythonRDD.writeUTF(pythonVer, dataOut)
    +        // Init a GatewayServer to port current BarrierTaskContext to Python side.
    +        val isBarrier = context.isInstanceOf[BarrierTaskContext]
    +        val secret = if (isBarrier) {
    +          Utils.createSecret(env.conf)
    +        } else {
    +          ""
    +        }
    +        val gatewayServer: Option[GatewayServer] = if (isBarrier) {
    +          Some(new GatewayServer.GatewayServerBuilder()
    +            .entryPoint(context.asInstanceOf[BarrierTaskContext])
    +            .authToken(secret)
    +            .javaPort(0)
    +            .callbackClient(GatewayServer.DEFAULT_PYTHON_PORT, GatewayServer.defaultAddress(),
    +              secret)
    +            .build())
    +        } else {
    +          None
    +        }
    +        gatewayServer.map(_.start())
    +        gatewayServer.foreach { server =>
    +          context.addTaskCompletionListener(_ => server.shutdown())
    +        }
    +        val boundPort: Int = gatewayServer.map(_.getListeningPort).getOrElse(0)
    +        if (boundPort == -1) {
    +          val message = "GatewayServer to port BarrierTaskContext failed to bind to Java side."
    +          logError(message)
    +          throw new SparkException(message)
    +        } else {
    +          logDebug(s"Started GatewayServer to port BarrierTaskContext on port $boundPort.")
    --- End diff --
    
    When `isBarrier` is false, I think we don't need show this?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22085: [SPARK-25095][PySpark] Python support for BarrierTaskCon...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22085
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2126/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22085: [SPARK-25095][PySpark] Python support for BarrierTaskCon...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22085
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2390/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22085: [WIP][SPARK-25095][PySpark] Python support for BarrierTa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22085
  
    **[Test build #94901 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94901/testReport)** for PR 22085 at commit [`e234a0a`](https://github.com/apache/spark/commit/e234a0a3d4e740d757fe086b0971a10f621d518b).
     * This patch **fails from timeout after a configured wait of \`340m\`**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22085#discussion_r211355022
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
    @@ -20,15 +20,16 @@ package org.apache.spark.api.python
     import java.io._
     import java.net._
     import java.nio.charset.StandardCharsets
    +import java.nio.charset.StandardCharsets.UTF_8
     import java.util.concurrent.atomic.AtomicBoolean
     
     import scala.collection.JavaConverters._
     
    -import org.apache.spark._
    +import org.apache.spark.{SparkException, _}
    --- End diff --
    
    `_` should include `SparkException` already


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22085: [WIP][SPARK-25095][PySpark] Python support for BarrierTa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22085
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22085: [SPARK-25095][PySpark] Python support for BarrierTaskCon...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22085
  
    **[Test build #94650 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94650/testReport)** for PR 22085 at commit [`7b48829`](https://github.com/apache/spark/commit/7b488299709f715d344e5c38956577f31718ab34).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22085: [SPARK-25095][PySpark] Python support for BarrierTaskCon...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22085
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94738/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22085: [SPARK-25095][PySpark] Python support for BarrierTaskCon...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22085
  
    **[Test build #94681 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94681/testReport)** for PR 22085 at commit [`289146d`](https://github.com/apache/spark/commit/289146d9e2533d1a48535288a0d5c8f1d3b51f14).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class BarrierTaskContext(TaskContext):`
      * `class BarrierTaskInfo(object):`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22085: [SPARK-25095][PySpark] Python support for BarrierTaskCon...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22085
  
    **[Test build #94738 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94738/testReport)** for PR 22085 at commit [`05c9609`](https://github.com/apache/spark/commit/05c9609e70d9fd9e40de599bc178b47bedb01a56).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/22085


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22085#discussion_r211356840
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
    @@ -76,6 +77,15 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
       // TODO: support accumulator in multiple UDF
       protected val accumulator = funcs.head.funcs.head.accumulator
     
    +  // Expose a ServerSocket to support method calls via socket from Python side.
    +  private[spark] var serverSocket: Option[ServerSocket] = None
    +
    +  // Authentication helper used when serving method calls via socket from Python side.
    +  private lazy val authHelper = {
    +    val conf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf())
    --- End diff --
    
    When `SparkEnv.get` returns null?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22085: [SPARK-25095][PySpark] Python support for BarrierTaskCon...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22085
  
    **[Test build #94951 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94951/testReport)** for PR 22085 at commit [`ba0ccad`](https://github.com/apache/spark/commit/ba0ccad6e074b378cfd454eeb90f8e6da1e321c6).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22085#discussion_r209846015
  
    --- Diff: python/pyspark/taskcontext.py ---
    @@ -95,3 +95,92 @@ def getLocalProperty(self, key):
             Get a local property set upstream in the driver, or None if it is missing.
             """
             return self._localProperties.get(key, None)
    +
    +
    +class BarrierTaskContext(TaskContext):
    +
    +    """
    +    .. note:: Experimental
    +
    +    A TaskContext with extra info and tooling for a barrier stage. To access the BarrierTaskContext
    +    for a running task, use:
    +    L{BarrierTaskContext.get()}.
    +
    +    .. versionadded:: 2.4.0
    +    """
    +
    +    _barrierContext = None
    +
    +    def __init__(self):
    +        """Construct a BarrierTaskContext, use get instead"""
    +        pass
    +
    +    @classmethod
    +    def _getOrCreate(cls):
    +        """Internal function to get or create global BarrierTaskContext."""
    +        if cls._taskContext is None:
    +            cls._taskContext = BarrierTaskContext()
    +        return cls._taskContext
    +
    +    @classmethod
    +    def get(cls):
    +        """
    +        Return the currently active BarrierTaskContext. This can be called inside of user functions
    +        to access contextual information about running tasks.
    +
    +        .. note:: Must be called on the worker, not the driver. Returns None if not initialized.
    +        """
    +        return cls._taskContext
    +
    +    @classmethod
    +    def _initialize(cls, ctx):
    +        """
    +        Initialize BarrierTaskContext, other methods within BarrierTaskContext can only be called
    +        after BarrierTaskContext is initialized.
    +        """
    +        cls._barrierContext = ctx
    +
    +    def barrier(self):
    +        """
    +        .. note:: Experimental
    +
    +        Sets a global barrier and waits until all tasks in this stage hit this barrier.
    +        Note this method is only allowed for a BarrierTaskContext.
    +
    +        .. versionadded:: 2.4.0
    +        """
    +        if self._barrierContext is None:
    +            raise Exception("Not supported to call barrier() before initialize " +
    +                            "BarrierTaskContext.")
    +        else:
    +            self._barrierContext.barrier()
    +
    +    def getTaskInfos(self):
    +        """
    +        .. note:: Experimental
    +
    +        Returns the all task infos in this barrier stage, the task infos are ordered by
    +        partitionId.
    +        Note this method is only allowed for a BarrierTaskContext.
    +
    +        .. versionadded:: 2.4.0
    +        """
    +        if self._barrierContext is None:
    +            raise Exception("Not supported to call getTaskInfos() before initialize " +
    +                            "BarrierTaskContext.")
    +        else:
    +            java_list = self._barrierContext.getTaskInfos()
    +            return [BarrierTaskInfo(h) for h in java_list]
    +
    +
    +class BarrierTaskInfo(object):
    +    """
    +    .. note:: Experimental
    +
    +    Carries all task infos of a barrier task.
    +
    +    .. versionadded:: 2.4.0
    +    """
    +
    +    def __init__(self, info):
    +        self.address = info.address
    --- End diff --
    
    * should be `info.address`
    * better to rename `info` to `jobj` to make it clear this is from Java


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22085: [SPARK-25095][PySpark] Python support for BarrierTaskCon...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22085
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95037/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22085: [WIP][SPARK-25095][PySpark] Python support for BarrierTa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22085
  
    **[Test build #94901 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94901/testReport)** for PR 22085 at commit [`e234a0a`](https://github.com/apache/spark/commit/e234a0a3d4e740d757fe086b0971a10f621d518b).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22085: [WIP][SPARK-25095][PySpark] Python support for BarrierTa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22085
  
    **[Test build #94942 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94942/testReport)** for PR 22085 at commit [`243a5a3`](https://github.com/apache/spark/commit/243a5a30443a98329dbf421233eda08e3118cb41).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22085: [SPARK-25095][PySpark] Python support for BarrierTaskCon...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22085
  
    **[Test build #95040 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95040/testReport)** for PR 22085 at commit [`1cacd40`](https://github.com/apache/spark/commit/1cacd40fec8e1156d2fa0d62761e78afd1781f5a).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22085: [SPARK-25095][PySpark] Python support for BarrierTaskCon...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22085
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22085#discussion_r209862336
  
    --- Diff: python/pyspark/taskcontext.py ---
    @@ -95,3 +95,92 @@ def getLocalProperty(self, key):
             Get a local property set upstream in the driver, or None if it is missing.
             """
             return self._localProperties.get(key, None)
    +
    +
    +class BarrierTaskContext(TaskContext):
    +
    +    """
    +    .. note:: Experimental
    +
    +    A TaskContext with extra info and tooling for a barrier stage. To access the BarrierTaskContext
    +    for a running task, use:
    +    L{BarrierTaskContext.get()}.
    +
    +    .. versionadded:: 2.4.0
    +    """
    +
    +    _barrierContext = None
    +
    +    def __init__(self):
    +        """Construct a BarrierTaskContext, use get instead"""
    +        pass
    --- End diff --
    
    Ah, this is called in `_getOrCreate`. Sorry, I rushed to read. In this case, frankly I think we can remove this since that's the default constructor injected by Python or monkey patch to disallow the initialization (like we did for `ImageSchema`) but I guess we don't necessarily be super clever on this. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22085#discussion_r211359959
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
    @@ -381,6 +465,20 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
           }
         }
       }
    +
    +  def writeUTF(str: String, dataOut: DataOutputStream) {
    +    val bytes = str.getBytes(StandardCharsets.UTF_8)
    --- End diff --
    
    nit: `UTF_8` or always use `StandardCharsets`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

Posted by jiangxb1987 <gi...@git.apache.org>.
Github user jiangxb1987 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22085#discussion_r209853276
  
    --- Diff: python/pyspark/taskcontext.py ---
    @@ -95,3 +95,92 @@ def getLocalProperty(self, key):
             Get a local property set upstream in the driver, or None if it is missing.
             """
             return self._localProperties.get(key, None)
    +
    +
    +class BarrierTaskContext(TaskContext):
    +
    +    """
    +    .. note:: Experimental
    +
    +    A TaskContext with extra info and tooling for a barrier stage. To access the BarrierTaskContext
    +    for a running task, use:
    +    L{BarrierTaskContext.get()}.
    +
    +    .. versionadded:: 2.4.0
    +    """
    +
    +    _barrierContext = None
    +
    +    def __init__(self):
    +        """Construct a BarrierTaskContext, use get instead"""
    +        pass
    --- End diff --
    
    This just follows `TaskContext.__init__()`, shall we update both?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22085#discussion_r209491060
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
    @@ -180,7 +183,42 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
             dataOut.writeInt(partitionIndex)
             // Python version of driver
             PythonRDD.writeUTF(pythonVer, dataOut)
    +        // Init a GatewayServer to port current BarrierTaskContext to Python side.
    +        val isBarrier = context.isInstanceOf[BarrierTaskContext]
    +        val secret = if (isBarrier) {
    +          Utils.createSecret(env.conf)
    +        } else {
    +          ""
    +        }
    +        val gatewayServer: Option[GatewayServer] = if (isBarrier) {
    +          Some(new GatewayServer.GatewayServerBuilder()
    +            .entryPoint(context.asInstanceOf[BarrierTaskContext])
    +            .authToken(secret)
    +            .javaPort(0)
    +            .callbackClient(GatewayServer.DEFAULT_PYTHON_PORT, GatewayServer.defaultAddress(),
    +              secret)
    +            .build())
    --- End diff --
    
    Yea, I read and understood if this is only initialised when the context is a `BarrierTaskContext` but this is super weird we start another Java gateway here. If it's a hard requirement, then I suspect the design issue. Should this be targeted to 2.4.0, @mengxr?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22085#discussion_r212168597
  
    --- Diff: python/pyspark/taskcontext.py ---
    @@ -95,3 +99,143 @@ def getLocalProperty(self, key):
             Get a local property set upstream in the driver, or None if it is missing.
             """
             return self._localProperties.get(key, None)
    +
    +
    +BARRIER_FUNCTION = 1
    +
    +
    +def _load_from_socket(port, auth_secret):
    +    """
    +    Load data from a given socket, this is a blocking method thus only return when the socket
    +    connection has been closed.
    +
    +    This is copied from context.py, while modified the message protocol.
    --- End diff --
    
    It would be nicer if we can deduplciate it later.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22085#discussion_r211358615
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
    @@ -180,7 +190,61 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
             dataOut.writeInt(partitionIndex)
             // Python version of driver
             PythonRDD.writeUTF(pythonVer, dataOut)
    +        // Init a ServerSocket to accept method calls from Python side.
    +        val isBarrier = context.isInstanceOf[BarrierTaskContext]
    +        if (isBarrier) {
    +          serverSocket = Some(new ServerSocket(0, 1, InetAddress.getByName("localhost")))
    +          // A call to accept() for ServerSocket shall block infinitely.
    +          serverSocket.map(_.setSoTimeout(0))
    +          new Thread("accept-connections") {
    +            setDaemon(true)
    +
    +            override def run(): Unit = {
    +              while (!serverSocket.get.isClosed()) {
    +                var sock: Socket = null
    +                try {
    +                  sock = serverSocket.get.accept()
    +                  sock.setSoTimeout(10000)
    +                  val cmdString = readUtf8(sock)
    +                  if (cmdString.equals("run")) {
    --- End diff --
    
    If we do not expect any other command from the socket, we should throw an exception


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22085#discussion_r211359028
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
    @@ -180,7 +190,61 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
             dataOut.writeInt(partitionIndex)
             // Python version of driver
             PythonRDD.writeUTF(pythonVer, dataOut)
    +        // Init a ServerSocket to accept method calls from Python side.
    +        val isBarrier = context.isInstanceOf[BarrierTaskContext]
    +        if (isBarrier) {
    +          serverSocket = Some(new ServerSocket(0, 1, InetAddress.getByName("localhost")))
    +          // A call to accept() for ServerSocket shall block infinitely.
    +          serverSocket.map(_.setSoTimeout(0))
    +          new Thread("accept-connections") {
    +            setDaemon(true)
    +
    +            override def run(): Unit = {
    +              while (!serverSocket.get.isClosed()) {
    +                var sock: Socket = null
    +                try {
    +                  sock = serverSocket.get.accept()
    +                  sock.setSoTimeout(10000)
    +                  val cmdString = readUtf8(sock)
    +                  if (cmdString.equals("run")) {
    +                    sock.setSoTimeout(0)
    +                    barrierAndServe(sock)
    +                  }
    +                } catch {
    +                  case _: SocketException =>
    --- End diff --
    
    Is the the timeout exception? I don't see any exception that we could silently ignore.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22085: [WIP][SPARK-25095][PySpark] Python support for BarrierTa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22085
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22085: [SPARK-25095][PySpark] Python support for BarrierTaskCon...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22085
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95040/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22085: [SPARK-25095][PySpark] Python support for BarrierTaskCon...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22085
  
    **[Test build #95037 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95037/testReport)** for PR 22085 at commit [`2a8f3cb`](https://github.com/apache/spark/commit/2a8f3cb210d78afa862a3a51bb8fb7b4e4c54623).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22085: [SPARK-25095][PySpark] Python support for BarrierTaskCon...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/22085
  
    Thank you so much @mengxr and @jiangxb1987.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22085: [SPARK-25095][PySpark] Python support for BarrierTaskCon...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22085
  
    **[Test build #95040 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95040/testReport)** for PR 22085 at commit [`1cacd40`](https://github.com/apache/spark/commit/1cacd40fec8e1156d2fa0d62761e78afd1781f5a).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

Posted by jiangxb1987 <gi...@git.apache.org>.
Github user jiangxb1987 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22085#discussion_r213050049
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
    @@ -180,7 +188,73 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
             dataOut.writeInt(partitionIndex)
             // Python version of driver
             PythonRDD.writeUTF(pythonVer, dataOut)
    +        // Init a ServerSocket to accept method calls from Python side.
    +        val isBarrier = context.isInstanceOf[BarrierTaskContext]
    +        if (isBarrier) {
    +          serverSocket = Some(new ServerSocket(/* port */ 0,
    +            /* backlog */ 1,
    +            InetAddress.getByName("localhost")))
    +          // A call to accept() for ServerSocket shall block infinitely.
    +          serverSocket.map(_.setSoTimeout(0))
    +          new Thread("accept-connections") {
    +            setDaemon(true)
    +
    +            override def run(): Unit = {
    +              while (!serverSocket.get.isClosed()) {
    +                var sock: Socket = null
    +                try {
    +                  sock = serverSocket.get.accept()
    +                  // Wait for function call from python side.
    +                  sock.setSoTimeout(10000)
    +                  val input = new DataInputStream(sock.getInputStream())
    --- End diff --
    
    Thanks for catching this, yea I agree it would be better to move the authentication before recognising functions.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22085: [SPARK-25095][PySpark] Python support for BarrierTaskCon...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the issue:

    https://github.com/apache/spark/pull/22085
  
    LGTM. I'm merging this into master. We might need a minor refactor for readability. But it shouldn't block developers testing this new feature. Thanks!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22085#discussion_r209846054
  
    --- Diff: python/pyspark/taskcontext.py ---
    @@ -95,3 +95,92 @@ def getLocalProperty(self, key):
             Get a local property set upstream in the driver, or None if it is missing.
             """
             return self._localProperties.get(key, None)
    +
    +
    +class BarrierTaskContext(TaskContext):
    +
    +    """
    +    .. note:: Experimental
    +
    +    A TaskContext with extra info and tooling for a barrier stage. To access the BarrierTaskContext
    +    for a running task, use:
    +    L{BarrierTaskContext.get()}.
    +
    +    .. versionadded:: 2.4.0
    +    """
    +
    +    _barrierContext = None
    +
    +    def __init__(self):
    +        """Construct a BarrierTaskContext, use get instead"""
    +        pass
    +
    +    @classmethod
    +    def _getOrCreate(cls):
    +        """Internal function to get or create global BarrierTaskContext."""
    +        if cls._taskContext is None:
    +            cls._taskContext = BarrierTaskContext()
    +        return cls._taskContext
    +
    +    @classmethod
    +    def get(cls):
    +        """
    +        Return the currently active BarrierTaskContext. This can be called inside of user functions
    +        to access contextual information about running tasks.
    +
    +        .. note:: Must be called on the worker, not the driver. Returns None if not initialized.
    +        """
    +        return cls._taskContext
    +
    +    @classmethod
    +    def _initialize(cls, ctx):
    +        """
    +        Initialize BarrierTaskContext, other methods within BarrierTaskContext can only be called
    +        after BarrierTaskContext is initialized.
    +        """
    +        cls._barrierContext = ctx
    +
    +    def barrier(self):
    +        """
    +        .. note:: Experimental
    +
    +        Sets a global barrier and waits until all tasks in this stage hit this barrier.
    +        Note this method is only allowed for a BarrierTaskContext.
    +
    +        .. versionadded:: 2.4.0
    +        """
    +        if self._barrierContext is None:
    +            raise Exception("Not supported to call barrier() before initialize " +
    +                            "BarrierTaskContext.")
    +        else:
    +            self._barrierContext.barrier()
    +
    +    def getTaskInfos(self):
    +        """
    +        .. note:: Experimental
    +
    +        Returns the all task infos in this barrier stage, the task infos are ordered by
    +        partitionId.
    +        Note this method is only allowed for a BarrierTaskContext.
    +
    +        .. versionadded:: 2.4.0
    +        """
    +        if self._barrierContext is None:
    +            raise Exception("Not supported to call getTaskInfos() before initialize " +
    +                            "BarrierTaskContext.")
    +        else:
    +            java_list = self._barrierContext.getTaskInfos()
    +            return [BarrierTaskInfo(h) for h in java_list]
    +
    +
    +class BarrierTaskInfo(object):
    +    """
    +    .. note:: Experimental
    +
    +    Carries all task infos of a barrier task.
    +
    +    .. versionadded:: 2.4.0
    +    """
    +
    +    def __init__(self, info):
    +        self.address = info.address
    --- End diff --
    
    * should be `info.address()`
    * better to rename `info` to `jobj` to make it clear this is from Java


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22085#discussion_r209464679
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
    @@ -180,7 +183,42 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
             dataOut.writeInt(partitionIndex)
             // Python version of driver
             PythonRDD.writeUTF(pythonVer, dataOut)
    +        // Init a GatewayServer to port current BarrierTaskContext to Python side.
    +        val isBarrier = context.isInstanceOf[BarrierTaskContext]
    +        val secret = if (isBarrier) {
    +          Utils.createSecret(env.conf)
    +        } else {
    +          ""
    +        }
    +        val gatewayServer: Option[GatewayServer] = if (isBarrier) {
    +          Some(new GatewayServer.GatewayServerBuilder()
    +            .entryPoint(context.asInstanceOf[BarrierTaskContext])
    +            .authToken(secret)
    +            .javaPort(0)
    +            .callbackClient(GatewayServer.DEFAULT_PYTHON_PORT, GatewayServer.defaultAddress(),
    +              secret)
    +            .build())
    +        } else {
    +          None
    +        }
    +        gatewayServer.map(_.start())
    +        gatewayServer.foreach { server =>
    +          context.addTaskCompletionListener(_ => server.shutdown())
    +        }
    +        val boundPort: Int = gatewayServer.map(_.getListeningPort).getOrElse(0)
    +        if (boundPort == -1) {
    +          val message = "GatewayServer to port BarrierTaskContext failed to bind to Java side."
    +          logError(message)
    +          throw new SparkException(message)
    +        } else {
    +          logDebug(s"Started GatewayServer to port BarrierTaskContext on port $boundPort.")
    +        }
             // Write out the TaskContextInfo
    --- End diff --
    
    This comment should be moved too.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22085#discussion_r209476191
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
    @@ -180,7 +183,42 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
             dataOut.writeInt(partitionIndex)
             // Python version of driver
             PythonRDD.writeUTF(pythonVer, dataOut)
    +        // Init a GatewayServer to port current BarrierTaskContext to Python side.
    +        val isBarrier = context.isInstanceOf[BarrierTaskContext]
    +        val secret = if (isBarrier) {
    +          Utils.createSecret(env.conf)
    +        } else {
    +          ""
    +        }
    +        val gatewayServer: Option[GatewayServer] = if (isBarrier) {
    +          Some(new GatewayServer.GatewayServerBuilder()
    +            .entryPoint(context.asInstanceOf[BarrierTaskContext])
    +            .authToken(secret)
    +            .javaPort(0)
    +            .callbackClient(GatewayServer.DEFAULT_PYTHON_PORT, GatewayServer.defaultAddress(),
    +              secret)
    +            .build())
    --- End diff --
    
    Wait wait.. you guys sure have another Java gateway for each worker? (or did I rush to read this code?) Can you elaborate why this is needed? We should avoid this unless it's super required or necessary. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22085: [SPARK-25095][PySpark] Python support for BarrierTaskCon...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22085
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22085: [SPARK-25095][PySpark] Python support for BarrierTaskCon...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22085
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22085: [WIP][SPARK-25095][PySpark] Python support for BarrierTa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22085
  
    **[Test build #94942 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94942/testReport)** for PR 22085 at commit [`243a5a3`](https://github.com/apache/spark/commit/243a5a30443a98329dbf421233eda08e3118cb41).
     * This patch **fails due to an unknown error code, -9**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22085: [SPARK-25095][PySpark] Python support for BarrierTaskCon...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22085
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94951/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

Posted by dbtsai <gi...@git.apache.org>.
Github user dbtsai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22085#discussion_r212762076
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
    @@ -180,7 +188,73 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
             dataOut.writeInt(partitionIndex)
             // Python version of driver
             PythonRDD.writeUTF(pythonVer, dataOut)
    +        // Init a ServerSocket to accept method calls from Python side.
    +        val isBarrier = context.isInstanceOf[BarrierTaskContext]
    +        if (isBarrier) {
    +          serverSocket = Some(new ServerSocket(/* port */ 0,
    +            /* backlog */ 1,
    +            InetAddress.getByName("localhost")))
    +          // A call to accept() for ServerSocket shall block infinitely.
    +          serverSocket.map(_.setSoTimeout(0))
    +          new Thread("accept-connections") {
    +            setDaemon(true)
    +
    +            override def run(): Unit = {
    +              while (!serverSocket.get.isClosed()) {
    +                var sock: Socket = null
    +                try {
    +                  sock = serverSocket.get.accept()
    +                  // Wait for function call from python side.
    +                  sock.setSoTimeout(10000)
    +                  val input = new DataInputStream(sock.getInputStream())
    +                  input.readInt() match {
    +                    case BarrierTaskContextMessageProtocol.BARRIER_FUNCTION =>
    +                      // The barrier() function may wait infinitely, socket shall not timeout
    +                      // before the function finishes.
    +                      sock.setSoTimeout(0)
    +                      barrierAndServe(sock)
    +
    +                    case _ =>
    +                      val out = new DataOutputStream(new BufferedOutputStream(
    +                        sock.getOutputStream))
    +                      writeUTF(BarrierTaskContextMessageProtocol.ERROR_UNRECOGNIZED_FUNCTION, out)
    +                  }
    +                } catch {
    +                  case e: SocketException if e.getMessage.contains("Socket closed") =>
    +                    // It is possible that the ServerSocket is not closed, but the native socket
    +                    // has already been closed, we shall catch and silently ignore this case.
    +                } finally {
    +                  if (sock != null) {
    +                    sock.close()
    +                  }
    +                }
    +              }
    +            }
    +          }.start()
    +        }
    +        val secret = if (isBarrier) {
    +          authHelper.secret
    +        } else {
    +          ""
    +        }
    +        // Close ServerSocket on task completion.
    +        serverSocket.foreach { server =>
    +          context.addTaskCompletionListener(_ => server.close())
    --- End diff --
    
    Addressed in https://github.com/apache/spark/pull/22229


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22085: [WIP][SPARK-25095][PySpark] Python support for Ba...

Posted by jiangxb1987 <gi...@git.apache.org>.
Github user jiangxb1987 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22085#discussion_r210963181
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
    @@ -381,6 +421,45 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
           }
         }
       }
    +
    +  /**
    +   * Gateway to call BarrierTaskContext.barrier().
    +   */
    +  def barrierAndServe(): Unit = {
    --- End diff --
    
    It's not clear yet how to trigger this.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

Posted by dbtsai <gi...@git.apache.org>.
Github user dbtsai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22085#discussion_r212755510
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
    @@ -180,7 +188,73 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
             dataOut.writeInt(partitionIndex)
             // Python version of driver
             PythonRDD.writeUTF(pythonVer, dataOut)
    +        // Init a ServerSocket to accept method calls from Python side.
    +        val isBarrier = context.isInstanceOf[BarrierTaskContext]
    +        if (isBarrier) {
    +          serverSocket = Some(new ServerSocket(/* port */ 0,
    +            /* backlog */ 1,
    +            InetAddress.getByName("localhost")))
    +          // A call to accept() for ServerSocket shall block infinitely.
    +          serverSocket.map(_.setSoTimeout(0))
    +          new Thread("accept-connections") {
    +            setDaemon(true)
    +
    +            override def run(): Unit = {
    +              while (!serverSocket.get.isClosed()) {
    +                var sock: Socket = null
    +                try {
    +                  sock = serverSocket.get.accept()
    +                  // Wait for function call from python side.
    +                  sock.setSoTimeout(10000)
    +                  val input = new DataInputStream(sock.getInputStream())
    +                  input.readInt() match {
    +                    case BarrierTaskContextMessageProtocol.BARRIER_FUNCTION =>
    +                      // The barrier() function may wait infinitely, socket shall not timeout
    +                      // before the function finishes.
    +                      sock.setSoTimeout(0)
    +                      barrierAndServe(sock)
    +
    +                    case _ =>
    +                      val out = new DataOutputStream(new BufferedOutputStream(
    +                        sock.getOutputStream))
    +                      writeUTF(BarrierTaskContextMessageProtocol.ERROR_UNRECOGNIZED_FUNCTION, out)
    +                  }
    +                } catch {
    +                  case e: SocketException if e.getMessage.contains("Socket closed") =>
    +                    // It is possible that the ServerSocket is not closed, but the native socket
    +                    // has already been closed, we shall catch and silently ignore this case.
    +                } finally {
    +                  if (sock != null) {
    +                    sock.close()
    +                  }
    +                }
    +              }
    +            }
    +          }.start()
    +        }
    +        val secret = if (isBarrier) {
    +          authHelper.secret
    +        } else {
    +          ""
    +        }
    +        // Close ServerSocket on task completion.
    +        serverSocket.foreach { server =>
    +          context.addTaskCompletionListener(_ => server.close())
    --- End diff --
    
    This is failing the Scala 2.12 build
    
    ```
    [error] /Users/d_tsai/dev/apache-spark/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala:242: ambiguous reference to overloaded definition,
    [error] both method addTaskCompletionListener in class TaskContext of type [U](f: org.apache.spark.TaskContext => U)org.apache.spark.TaskContext
    [error] and  method addTaskCompletionListener in class TaskContext of type (listener: org.apache.spark.util.TaskCompletionListener)org.apache.spark.TaskContext
    [error] match argument types (org.apache.spark.TaskContext => Unit)
    [error]           context.addTaskCompletionListener(_ => server.close())
    [error]                   ^
    [error] one error found
    [error] Compile failed at Aug 24, 2018 1:56:06 PM [31.582s]
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22085: [WIP][SPARK-25095][PySpark] Python support for BarrierTa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22085
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2311/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22085#discussion_r209473946
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
    @@ -180,7 +183,42 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
             dataOut.writeInt(partitionIndex)
             // Python version of driver
             PythonRDD.writeUTF(pythonVer, dataOut)
    +        // Init a GatewayServer to port current BarrierTaskContext to Python side.
    +        val isBarrier = context.isInstanceOf[BarrierTaskContext]
    +        val secret = if (isBarrier) {
    +          Utils.createSecret(env.conf)
    +        } else {
    +          ""
    +        }
    +        val gatewayServer: Option[GatewayServer] = if (isBarrier) {
    +          Some(new GatewayServer.GatewayServerBuilder()
    +            .entryPoint(context.asInstanceOf[BarrierTaskContext])
    +            .authToken(secret)
    +            .javaPort(0)
    +            .callbackClient(GatewayServer.DEFAULT_PYTHON_PORT, GatewayServer.defaultAddress(),
    --- End diff --
    
    Leave a TODO here. We do not have requests from Java to Python.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22085: [SPARK-25095][PySpark] Python support for BarrierTaskCon...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22085
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22085: [SPARK-25095][PySpark] Python support for BarrierTaskCon...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22085
  
    **[Test build #95037 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95037/testReport)** for PR 22085 at commit [`2a8f3cb`](https://github.com/apache/spark/commit/2a8f3cb210d78afa862a3a51bb8fb7b4e4c54623).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22085#discussion_r213032992
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
    @@ -180,7 +188,73 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
             dataOut.writeInt(partitionIndex)
             // Python version of driver
             PythonRDD.writeUTF(pythonVer, dataOut)
    +        // Init a ServerSocket to accept method calls from Python side.
    +        val isBarrier = context.isInstanceOf[BarrierTaskContext]
    +        if (isBarrier) {
    +          serverSocket = Some(new ServerSocket(/* port */ 0,
    +            /* backlog */ 1,
    +            InetAddress.getByName("localhost")))
    +          // A call to accept() for ServerSocket shall block infinitely.
    +          serverSocket.map(_.setSoTimeout(0))
    +          new Thread("accept-connections") {
    +            setDaemon(true)
    +
    +            override def run(): Unit = {
    +              while (!serverSocket.get.isClosed()) {
    +                var sock: Socket = null
    +                try {
    +                  sock = serverSocket.get.accept()
    +                  // Wait for function call from python side.
    +                  sock.setSoTimeout(10000)
    +                  val input = new DataInputStream(sock.getInputStream())
    --- End diff --
    
    why is authentication the first thing which happens on this connection?  I don't think anything bad can happen in this case, but it just makes it more likely we leave a security hole here later on.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22085#discussion_r209834646
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
    @@ -180,7 +183,42 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
             dataOut.writeInt(partitionIndex)
             // Python version of driver
             PythonRDD.writeUTF(pythonVer, dataOut)
    +        // Init a GatewayServer to port current BarrierTaskContext to Python side.
    +        val isBarrier = context.isInstanceOf[BarrierTaskContext]
    +        val secret = if (isBarrier) {
    +          Utils.createSecret(env.conf)
    +        } else {
    +          ""
    +        }
    +        val gatewayServer: Option[GatewayServer] = if (isBarrier) {
    +          Some(new GatewayServer.GatewayServerBuilder()
    +            .entryPoint(context.asInstanceOf[BarrierTaskContext])
    +            .authToken(secret)
    +            .javaPort(0)
    +            .callbackClient(GatewayServer.DEFAULT_PYTHON_PORT, GatewayServer.defaultAddress(),
    +              secret)
    +            .build())
    --- End diff --
    
    BTW, this is a design change. We should probably update https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals too .. is this really required to open a gate there?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22085: [SPARK-25095][PySpark] Python support for BarrierTaskCon...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22085
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22085#discussion_r211357983
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
    @@ -180,7 +190,61 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
             dataOut.writeInt(partitionIndex)
             // Python version of driver
             PythonRDD.writeUTF(pythonVer, dataOut)
    +        // Init a ServerSocket to accept method calls from Python side.
    +        val isBarrier = context.isInstanceOf[BarrierTaskContext]
    +        if (isBarrier) {
    +          serverSocket = Some(new ServerSocket(0, 1, InetAddress.getByName("localhost")))
    --- End diff --
    
    minor: useful to add `/* port */` and `/* backlog */` 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22085: [SPARK-25095][PySpark] Python support for BarrierTaskCon...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22085
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22085: [WIP][SPARK-25095][PySpark] Python support for Ba...

Posted by jiangxb1987 <gi...@git.apache.org>.
Github user jiangxb1987 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22085#discussion_r210963511
  
    --- Diff: python/pyspark/taskcontext.py ---
    @@ -95,3 +99,124 @@ def getLocalProperty(self, key):
             Get a local property set upstream in the driver, or None if it is missing.
             """
             return self._localProperties.get(key, None)
    +
    +
    +def _load_from_socket(port, auth_secret):
    +    """
    +    Load data from a given socket, this is a blocking method thus only return when the socket
    +    connection has been closed.
    +    """
    +    sock = None
    +    # Support for both IPv4 and IPv6.
    +    # On most of IPv6-ready systems, IPv6 will take precedence.
    +    for res in socket.getaddrinfo("localhost", port, socket.AF_UNSPEC, socket.SOCK_STREAM):
    +        af, socktype, proto, canonname, sa = res
    +        sock = socket.socket(af, socktype, proto)
    +        try:
    +            # Do not allow timeout for socket reading operation.
    +            sock.settimeout(None)
    +            sock.connect(sa)
    +        except socket.error:
    +            sock.close()
    +            sock = None
    +            continue
    +        break
    +    if not sock:
    +        raise Exception("could not open socket")
    +
    +    sockfile = sock.makefile("rwb", 65536)
    +    do_server_auth(sockfile, auth_secret)
    +
    +    # The socket will be automatically closed when garbage-collected.
    +    return UTF8Deserializer().loads(sockfile)
    +
    +
    +class BarrierTaskContext(TaskContext):
    +
    +    """
    +    .. note:: Experimental
    +
    +    A TaskContext with extra info and tooling for a barrier stage. To access the BarrierTaskContext
    +    for a running task, use:
    +    L{BarrierTaskContext.get()}.
    +
    +    .. versionadded:: 2.4.0
    +    """
    +
    +    _port = None
    +    _secret = None
    +
    +    def __init__(self):
    +        """Construct a BarrierTaskContext, use get instead"""
    +        pass
    +
    +    @classmethod
    +    def _getOrCreate(cls):
    +        """Internal function to get or create global BarrierTaskContext."""
    +        if cls._taskContext is None:
    +            cls._taskContext = BarrierTaskContext()
    +        return cls._taskContext
    +
    +    @classmethod
    +    def get(cls):
    +        """
    +        Return the currently active BarrierTaskContext. This can be called inside of user functions
    +        to access contextual information about running tasks.
    +
    +        .. note:: Must be called on the worker, not the driver. Returns None if not initialized.
    +        """
    +        return cls._taskContext
    +
    +    @classmethod
    +    def _initialize(cls, port, secret):
    +        """
    +        Initialize BarrierTaskContext, other methods within BarrierTaskContext can only be called
    +        after BarrierTaskContext is initialized.
    +        """
    +        cls._port = port
    +        cls._secret = secret
    +
    +    def barrier(self):
    +        """
    +        .. note:: Experimental
    +
    +        Sets a global barrier and waits until all tasks in this stage hit this barrier.
    +        Note this method is only allowed for a BarrierTaskContext.
    +
    +        .. versionadded:: 2.4.0
    +        """
    +        if self._port is None or self._secret is None:
    +            raise Exception("Not supported to call barrier() before initialize " +
    +                            "BarrierTaskContext.")
    +        else:
    +            _load_from_socket(self._port, self._secret)
    +
    +    def getTaskInfos(self):
    --- End diff --
    
    This is not available temporarily.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22085: [SPARK-25095][PySpark] Python support for BarrierTaskCon...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22085
  
    **[Test build #94738 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94738/testReport)** for PR 22085 at commit [`05c9609`](https://github.com/apache/spark/commit/05c9609e70d9fd9e40de599bc178b47bedb01a56).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22085: [SPARK-25095][PySpark] Python support for BarrierTaskCon...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22085
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94650/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22085: [SPARK-25095][PySpark] Python support for BarrierTaskCon...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22085
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22085: [WIP][SPARK-25095][PySpark] Python support for BarrierTa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22085
  
    **[Test build #94951 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94951/testReport)** for PR 22085 at commit [`ba0ccad`](https://github.com/apache/spark/commit/ba0ccad6e074b378cfd454eeb90f8e6da1e321c6).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22085: [SPARK-25095][PySpark] Python support for BarrierTaskCon...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22085
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22085: [SPARK-25095][PySpark] Python support for BarrierTaskCon...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22085
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2173/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22085: [WIP][SPARK-25095][PySpark] Python support for BarrierTa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22085
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22085: [SPARK-25095][PySpark] Python support for BarrierTaskCon...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22085
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22085: [SPARK-25095][PySpark] Python support for BarrierTaskCon...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/22085
  
    Thanks, @jiangxb1987 and @mengxr again.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22085: [SPARK-25095][PySpark] Python support for BarrierTaskCon...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22085
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22085: [SPARK-25095][PySpark] Python support for BarrierTaskCon...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the issue:

    https://github.com/apache/spark/pull/22085
  
    @HyukjinKwon Thanks for the feedback! We will replace the py4j route by a special implementation that can only trigger "context.barrier()" in JVM.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22085#discussion_r209862546
  
    --- Diff: python/pyspark/taskcontext.py ---
    @@ -95,3 +95,92 @@ def getLocalProperty(self, key):
             Get a local property set upstream in the driver, or None if it is missing.
             """
             return self._localProperties.get(key, None)
    +
    +
    +class BarrierTaskContext(TaskContext):
    +
    +    """
    +    .. note:: Experimental
    +
    +    A TaskContext with extra info and tooling for a barrier stage. To access the BarrierTaskContext
    +    for a running task, use:
    +    L{BarrierTaskContext.get()}.
    +
    +    .. versionadded:: 2.4.0
    +    """
    +
    +    _barrierContext = None
    +
    +    def __init__(self):
    +        """Construct a BarrierTaskContext, use get instead"""
    +        pass
    --- End diff --
    
    I'm okay as is.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22085#discussion_r209464591
  
    --- Diff: python/pyspark/worker.py ---
    @@ -275,6 +280,10 @@ def main(infile, outfile):
             shuffle.DiskBytesSpilled = 0
             _accumulatorRegistry.clear()
     
    +        if isBarrier:
    +            paras = GatewayParameters(port=boundPort, auth_token=secret, auto_convert=True)
    --- End diff --
    
    Maybe `params` instead of `paras`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22085#discussion_r209833972
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
    @@ -180,7 +183,42 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
             dataOut.writeInt(partitionIndex)
             // Python version of driver
             PythonRDD.writeUTF(pythonVer, dataOut)
    +        // Init a GatewayServer to port current BarrierTaskContext to Python side.
    +        val isBarrier = context.isInstanceOf[BarrierTaskContext]
    +        val secret = if (isBarrier) {
    +          Utils.createSecret(env.conf)
    +        } else {
    +          ""
    +        }
    +        val gatewayServer: Option[GatewayServer] = if (isBarrier) {
    +          Some(new GatewayServer.GatewayServerBuilder()
    +            .entryPoint(context.asInstanceOf[BarrierTaskContext])
    +            .authToken(secret)
    +            .javaPort(0)
    +            .callbackClient(GatewayServer.DEFAULT_PYTHON_PORT, GatewayServer.defaultAddress(),
    +              secret)
    +            .build())
    --- End diff --
    
    Mainly the reason is about resource usage, unusual access pattern via Py4J at worker side, and the possibility of allowing JVM access within Python worker.
    
    It pretty much looks an overkill to launch a Java gateway to allow access to call a function assuming from https://github.com/apache/spark/pull/22085#discussion_r209490553. This pattern sounds pretty unusual - such cases, we usually send the data manually and read it in Python side, for instance `TaskContext`. Now, it opens a gateway for each worker if I am not mistaken.
    
    I was thinking if we can avoid this. Can you elaborate why this is required and necessary? I haven't got enough time to look into this so was thinking about taking a look on this weekends.
    
    This also now opens a possibility for an JVM access from worker side via `BarrierTaskContext`. For instance, I believe we can hack and access to JVM inside of UDFs.
    



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22085#discussion_r210148558
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
    @@ -180,7 +183,42 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
             dataOut.writeInt(partitionIndex)
             // Python version of driver
             PythonRDD.writeUTF(pythonVer, dataOut)
    +        // Init a GatewayServer to port current BarrierTaskContext to Python side.
    +        val isBarrier = context.isInstanceOf[BarrierTaskContext]
    +        val secret = if (isBarrier) {
    +          Utils.createSecret(env.conf)
    +        } else {
    +          ""
    +        }
    +        val gatewayServer: Option[GatewayServer] = if (isBarrier) {
    +          Some(new GatewayServer.GatewayServerBuilder()
    +            .entryPoint(context.asInstanceOf[BarrierTaskContext])
    +            .authToken(secret)
    +            .javaPort(0)
    +            .callbackClient(GatewayServer.DEFAULT_PYTHON_PORT, GatewayServer.defaultAddress(),
    +              secret)
    +            .build())
    --- End diff --
    
    Not yet. So I asked to hold it back for now since another gateway here looks the last choice, and was wondering if we can avoid to target 2.4.0. If this blocks, please go ahead. Will check it later on this weekends.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22085: [SPARK-25095][PySpark] Python support for BarrierTaskCon...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/22085
  
    @mengxr, let me leave this link here again - https://github.com/apache/spark/pull/22085#discussion_r209491060 in case this is missed by being folded.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22085#discussion_r209473887
  
    --- Diff: python/pyspark/taskcontext.py ---
    @@ -95,3 +96,33 @@ def getLocalProperty(self, key):
             Get a local property set upstream in the driver, or None if it is missing.
             """
             return self._localProperties.get(key, None)
    +
    +    def barrier(self):
    --- End diff --
    
    Create `BarrierTaskContext` that extends `TaskContext` and then move those two methods there.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22085: [WIP][SPARK-25095][PySpark] Python support for BarrierTa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22085
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94901/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22085#discussion_r209464569
  
    --- Diff: python/pyspark/worker.py ---
    @@ -261,6 +263,9 @@ def main(infile, outfile):
     
             # initialize global state
             taskContext = TaskContext._getOrCreate()
    +        isBarrier = read_bool(infile)
    --- End diff --
    
    Add a comment indicating the following 3 inputs are only for barrier task?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22085: [SPARK-25095][PySpark] Python support for BarrierTaskCon...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22085
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2104/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22085: [SPARK-25095][PySpark] Python support for BarrierTaskCon...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22085
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2387/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22085: [SPARK-25095][PySpark] Python support for BarrierTaskCon...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22085
  
    **[Test build #94650 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94650/testReport)** for PR 22085 at commit [`7b48829`](https://github.com/apache/spark/commit/7b488299709f715d344e5c38956577f31718ab34).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

Posted by jiangxb1987 <gi...@git.apache.org>.
Github user jiangxb1987 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22085#discussion_r211182337
  
    --- Diff: python/pyspark/taskcontext.py ---
    @@ -95,3 +99,124 @@ def getLocalProperty(self, key):
             Get a local property set upstream in the driver, or None if it is missing.
             """
             return self._localProperties.get(key, None)
    +
    +
    +def _load_from_socket(port, auth_secret):
    +    """
    +    Load data from a given socket, this is a blocking method thus only return when the socket
    +    connection has been closed.
    +    """
    +    sock = None
    +    # Support for both IPv4 and IPv6.
    +    # On most of IPv6-ready systems, IPv6 will take precedence.
    +    for res in socket.getaddrinfo("localhost", port, socket.AF_UNSPEC, socket.SOCK_STREAM):
    +        af, socktype, proto, canonname, sa = res
    +        sock = socket.socket(af, socktype, proto)
    +        try:
    +            # Do not allow timeout for socket reading operation.
    +            sock.settimeout(None)
    +            sock.connect(sa)
    +        except socket.error:
    +            sock.close()
    +            sock = None
    +            continue
    +        break
    +    if not sock:
    +        raise Exception("could not open socket")
    +
    +    sockfile = sock.makefile("rwb", 65536)
    +    do_server_auth(sockfile, auth_secret)
    +
    +    # The socket will be automatically closed when garbage-collected.
    +    return UTF8Deserializer().loads(sockfile)
    +
    +
    +class BarrierTaskContext(TaskContext):
    +
    +    """
    +    .. note:: Experimental
    +
    +    A TaskContext with extra info and tooling for a barrier stage. To access the BarrierTaskContext
    +    for a running task, use:
    +    L{BarrierTaskContext.get()}.
    +
    +    .. versionadded:: 2.4.0
    +    """
    +
    +    _port = None
    +    _secret = None
    +
    +    def __init__(self):
    +        """Construct a BarrierTaskContext, use get instead"""
    +        pass
    +
    +    @classmethod
    +    def _getOrCreate(cls):
    +        """Internal function to get or create global BarrierTaskContext."""
    +        if cls._taskContext is None:
    +            cls._taskContext = BarrierTaskContext()
    +        return cls._taskContext
    +
    +    @classmethod
    +    def get(cls):
    +        """
    +        Return the currently active BarrierTaskContext. This can be called inside of user functions
    +        to access contextual information about running tasks.
    +
    +        .. note:: Must be called on the worker, not the driver. Returns None if not initialized.
    +        """
    +        return cls._taskContext
    +
    +    @classmethod
    +    def _initialize(cls, port, secret):
    +        """
    +        Initialize BarrierTaskContext, other methods within BarrierTaskContext can only be called
    +        after BarrierTaskContext is initialized.
    +        """
    +        cls._port = port
    +        cls._secret = secret
    +
    +    def barrier(self):
    +        """
    +        .. note:: Experimental
    +
    +        Sets a global barrier and waits until all tasks in this stage hit this barrier.
    +        Note this method is only allowed for a BarrierTaskContext.
    +
    +        .. versionadded:: 2.4.0
    +        """
    +        if self._port is None or self._secret is None:
    +            raise Exception("Not supported to call barrier() before initialize " +
    +                            "BarrierTaskContext.")
    +        else:
    +            _load_from_socket(self._port, self._secret)
    +
    +    def getTaskInfos(self):
    --- End diff --
    
    fixed


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22085#discussion_r211460405
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
    @@ -20,15 +20,16 @@ package org.apache.spark.api.python
     import java.io._
     import java.net._
     import java.nio.charset.StandardCharsets
    +import java.nio.charset.StandardCharsets.UTF_8
     import java.util.concurrent.atomic.AtomicBoolean
     
     import scala.collection.JavaConverters._
     
    -import org.apache.spark._
    +import org.apache.spark.{SparkException, _}
     import org.apache.spark.internal.Logging
    +import org.apache.spark.security.SocketAuthHelper
     import org.apache.spark.util._
     
    -
    --- End diff --
    
    tiny nit: I would remove this one back while addressing other comments.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22085: [WIP][SPARK-25095][PySpark] Python support for BarrierTa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22085
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94942/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22085#discussion_r213060868
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
    @@ -180,7 +188,73 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
             dataOut.writeInt(partitionIndex)
             // Python version of driver
             PythonRDD.writeUTF(pythonVer, dataOut)
    +        // Init a ServerSocket to accept method calls from Python side.
    +        val isBarrier = context.isInstanceOf[BarrierTaskContext]
    +        if (isBarrier) {
    +          serverSocket = Some(new ServerSocket(/* port */ 0,
    +            /* backlog */ 1,
    +            InetAddress.getByName("localhost")))
    +          // A call to accept() for ServerSocket shall block infinitely.
    +          serverSocket.map(_.setSoTimeout(0))
    +          new Thread("accept-connections") {
    +            setDaemon(true)
    +
    +            override def run(): Unit = {
    +              while (!serverSocket.get.isClosed()) {
    +                var sock: Socket = null
    +                try {
    +                  sock = serverSocket.get.accept()
    +                  // Wait for function call from python side.
    +                  sock.setSoTimeout(10000)
    +                  val input = new DataInputStream(sock.getInputStream())
    --- End diff --
    
    ok I'm doing this -- SPARK-25253, will open a pr shortly


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

Posted by jiangxb1987 <gi...@git.apache.org>.
Github user jiangxb1987 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22085#discussion_r209974729
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
    @@ -180,7 +183,42 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
             dataOut.writeInt(partitionIndex)
             // Python version of driver
             PythonRDD.writeUTF(pythonVer, dataOut)
    +        // Init a GatewayServer to port current BarrierTaskContext to Python side.
    +        val isBarrier = context.isInstanceOf[BarrierTaskContext]
    +        val secret = if (isBarrier) {
    +          Utils.createSecret(env.conf)
    +        } else {
    +          ""
    +        }
    +        val gatewayServer: Option[GatewayServer] = if (isBarrier) {
    +          Some(new GatewayServer.GatewayServerBuilder()
    +            .entryPoint(context.asInstanceOf[BarrierTaskContext])
    +            .authToken(secret)
    +            .javaPort(0)
    +            .callbackClient(GatewayServer.DEFAULT_PYTHON_PORT, GatewayServer.defaultAddress(),
    +              secret)
    +            .build())
    --- End diff --
    
    The major issue here is that we want to make the `barrier()` call blocking, the task shall wait until timeout or succeeded, do we have other ways to achieve this goal other than current approach here?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22085: [SPARK-25095][PySpark] Python support for BarrierTaskCon...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22085
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2318/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22085#discussion_r211360719
  
    --- Diff: python/pyspark/taskcontext.py ---
    @@ -95,3 +99,126 @@ def getLocalProperty(self, key):
             Get a local property set upstream in the driver, or None if it is missing.
             """
             return self._localProperties.get(key, None)
    +
    +
    +def _load_from_socket(port, auth_secret):
    +    """
    +    Load data from a given socket, this is a blocking method thus only return when the socket
    +    connection has been closed.
    +    """
    +    sock = None
    +    # Support for both IPv4 and IPv6.
    +    # On most of IPv6-ready systems, IPv6 will take precedence.
    +    for res in socket.getaddrinfo("localhost", port, socket.AF_UNSPEC, socket.SOCK_STREAM):
    +        af, socktype, proto, canonname, sa = res
    +        sock = socket.socket(af, socktype, proto)
    +        try:
    +            # Do not allow timeout for socket reading operation.
    +            sock.settimeout(None)
    +            sock.connect(sa)
    +        except socket.error:
    +            sock.close()
    +            sock = None
    +            continue
    +        break
    +    if not sock:
    +        raise Exception("could not open socket")
    +
    +    sockfile = sock.makefile("rwb", 65536)
    +    write_with_length("run".encode("utf-8"), sockfile)
    +    sockfile.flush()
    +    do_server_auth(sockfile, auth_secret)
    +
    +    # The socket will be automatically closed when garbage-collected.
    +    return UTF8Deserializer().loads(sockfile)
    +
    +
    +class BarrierTaskContext(TaskContext):
    +
    +    """
    +    .. note:: Experimental
    +
    +    A TaskContext with extra info and tooling for a barrier stage. To access the BarrierTaskContext
    +    for a running task, use:
    +    L{BarrierTaskContext.get()}.
    +
    +    .. versionadded:: 2.4.0
    +    """
    +
    +    _port = None
    +    _secret = None
    +
    +    def __init__(self):
    +        """Construct a BarrierTaskContext, use get instead"""
    +        pass
    +
    +    @classmethod
    +    def _getOrCreate(cls):
    +        """Internal function to get or create global BarrierTaskContext."""
    +        if cls._taskContext is None:
    --- End diff --
    
    Q: Does it handle python worker reuse?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22085: [SPARK-25095][PySpark] Python support for Barrier...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22085#discussion_r209830941
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
    @@ -180,7 +183,42 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
             dataOut.writeInt(partitionIndex)
             // Python version of driver
             PythonRDD.writeUTF(pythonVer, dataOut)
    +        // Init a GatewayServer to port current BarrierTaskContext to Python side.
    +        val isBarrier = context.isInstanceOf[BarrierTaskContext]
    +        val secret = if (isBarrier) {
    +          Utils.createSecret(env.conf)
    +        } else {
    +          ""
    +        }
    +        val gatewayServer: Option[GatewayServer] = if (isBarrier) {
    +          Some(new GatewayServer.GatewayServerBuilder()
    +            .entryPoint(context.asInstanceOf[BarrierTaskContext])
    +            .authToken(secret)
    +            .javaPort(0)
    +            .callbackClient(GatewayServer.DEFAULT_PYTHON_PORT, GatewayServer.defaultAddress(),
    +              secret)
    +            .build())
    --- End diff --
    
    @HyukjinKwon Could you elaborate your concerns? Is it because resource usage or security?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22085: [SPARK-25095][PySpark] Python support for BarrierTaskCon...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22085
  
    **[Test build #94681 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94681/testReport)** for PR 22085 at commit [`289146d`](https://github.com/apache/spark/commit/289146d9e2533d1a48535288a0d5c8f1d3b51f14).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org