You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by vanzin <gi...@git.apache.org> on 2015/11/07 00:17:59 UTC

[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

GitHub user vanzin opened a pull request:

    https://github.com/apache/spark/pull/9530

    [SPARK-11140] [core] Transfer files using network lib when using NettyRpcEnv.

    This change abstracts the code that serves jars / files to executors so that
    each RpcEnv can have its own implementation; the akka version uses the existing
    HTTP-based file serving mechanism, while the netty versions uses the new
    stream support added to the network lib, which makes file transfers benefit
    from the easier security configuration of the network library, and should also
    reduce overhead overall.
    
    The change includes a small fix to TransportChannelHandler so that it propagates
    user events to downstream handlers.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/vanzin/spark SPARK-11140

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/9530.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #9530
    
----
commit 3f8209fe664836242ab849d4e8503c4443798556
Author: Marcelo Vanzin <va...@cloudera.com>
Date:   2015-10-18T00:09:21Z

    [SPARK-11140] [core] Transfer files using network lib when using NettyRpcEnv.
    
    This change abstracts the code that serves jars / files to executors so that
    each RpcEnv can have its own implementation; the akka version uses the existing
    HTTP-based file serving mechanism, while the netty versions uses the new
    stream support added to the network lib, which makes file transfers benefit
    from the easier security configuration of the network library, and should also
    reduce overhead overall.
    
    The change includes a small fix to TransportChannelHandler so that it propagates
    user events to downstream handlers.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-156601039
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/45904/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-155574987
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/45540/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-155202070
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-154753512
  
    **[Test build #45292 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45292/consoleFull)** for PR 9530 at commit [`3f8209f`](https://github.com/apache/spark/commit/3f8209fe664836242ab849d4e8503c4443798556).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-155160617
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-155568326
  
    **[Test build #45547 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45547/consoleFull)** for PR 9530 at commit [`71ac0cf`](https://github.com/apache/spark/commit/71ac0cfdb25d7bb29e180c704a8eac840c7fccd4).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-155269630
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-155166742
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-155160422
  
    **[Test build #45383 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45383/consoleFull)** for PR 9530 at commit [`87b7a91`](https://github.com/apache/spark/commit/87b7a9174be76646ef814b42dc7d0172ccc23552).
     * This patch **fails PySpark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:\n  * `public class JavaAFTSurvivalRegressionExample `\n


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9530#discussion_r44350637
  
    --- Diff: core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala ---
    @@ -501,13 +625,15 @@ private[netty] class NettyRpcHandler(
         dispatcher.postRemoteMessage(messageToDispatch, callback)
       }
     
    -  override def getStreamManager: StreamManager = new OneForOneStreamManager
    +  override def getStreamManager: StreamManager = streamManager
     
       override def exceptionCaught(cause: Throwable, client: TransportClient): Unit = {
         val addr = client.getChannel.remoteAddress().asInstanceOf[InetSocketAddress]
         if (addr != null) {
    -      val clientAddr = RpcAddress(addr.getHostName, addr.getPort)
    --- End diff --
    
    You added a new if `if (clients.containsKey(client)) {`. There was a similar logic here: https://github.com/apache/spark/pull/9210/files#diff-0c89b4a60c30a7cd2224bb64d93da942L477 but you removed it. Now you are adding it back.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9530#discussion_r44361876
  
    --- Diff: core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala ---
    @@ -302,6 +325,138 @@ private[netty] class NettyRpcEnv(
         }
       }
     
    +  override def fileServer: RpcEnvFileServer = streamManager
    +
    +  override def openChannel(uri: String): ReadableByteChannel = {
    +    val parsedUri = new URI(uri)
    +    require(parsedUri.getHost() != null, "Host name must be defined.")
    +    require(parsedUri.getPort() > 0, "Port must be defined.")
    +    require(parsedUri.getPath() != null && parsedUri.getPath().nonEmpty, "Path must be defined.")
    +
    +    val pipe = Pipe.open()
    +    val source = new FileDownloadChannel(pipe.source())
    +    try {
    +      val client = fileDownloadClient(parsedUri.getHost(), parsedUri.getPort())
    +      val callback = new FileDownloadCallback(pipe.sink(), source, client)
    +      client.stream(parsedUri.getPath(), callback)
    +    } catch {
    +      case e: Exception =>
    +        pipe.sink().close()
    +        source.close()
    +        throw e
    +    }
    +
    +    source
    +  }
    +
    +  private def fileDownloadClient(host: String, port: Int): TransportClient = synchronized {
    --- End diff --
    
    Just found another case: `fetchFiles` is also used in `DriverRunner` in the `Worker`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-155197443
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9530#discussion_r44349342
  
    --- Diff: core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala ---
    @@ -501,13 +625,15 @@ private[netty] class NettyRpcHandler(
         dispatcher.postRemoteMessage(messageToDispatch, callback)
       }
     
    -  override def getStreamManager: StreamManager = new OneForOneStreamManager
    +  override def getStreamManager: StreamManager = streamManager
     
       override def exceptionCaught(cause: Throwable, client: TransportClient): Unit = {
         val addr = client.getChannel.remoteAddress().asInstanceOf[InetSocketAddress]
         if (addr != null) {
    -      val clientAddr = RpcAddress(addr.getHostName, addr.getPort)
    --- End diff --
    
    Why reverted it back to the old codes?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-155166367
  
    retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-155610649
  
    **[Test build #45552 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45552/consoleFull)** for PR 9530 at commit [`11f61a8`](https://github.com/apache/spark/commit/11f61a88e4a9a1a1a17572c8579143fc856667d9).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9530#discussion_r44361246
  
    --- Diff: core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala ---
    @@ -302,6 +325,138 @@ private[netty] class NettyRpcEnv(
         }
       }
     
    +  override def fileServer: RpcEnvFileServer = streamManager
    +
    +  override def openChannel(uri: String): ReadableByteChannel = {
    +    val parsedUri = new URI(uri)
    +    require(parsedUri.getHost() != null, "Host name must be defined.")
    +    require(parsedUri.getPort() > 0, "Port must be defined.")
    +    require(parsedUri.getPath() != null && parsedUri.getPath().nonEmpty, "Path must be defined.")
    +
    +    val pipe = Pipe.open()
    +    val source = new FileDownloadChannel(pipe.source())
    +    try {
    +      val client = fileDownloadClient(parsedUri.getHost(), parsedUri.getPort())
    +      val callback = new FileDownloadCallback(pipe.sink(), source, client)
    +      client.stream(parsedUri.getPath(), callback)
    +    } catch {
    +      case e: Exception =>
    +        pipe.sink().close()
    +        source.close()
    +        throw e
    +    }
    +
    +    source
    +  }
    +
    +  private def fileDownloadClient(host: String, port: Int): TransportClient = synchronized {
    --- End diff --
    
    Downloading from different hosts mean different clients. But different tasks will still all download from the same host (the driver).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-155579680
  
    **[Test build #45552 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45552/consoleFull)** for PR 9530 at commit [`11f61a8`](https://github.com/apache/spark/commit/11f61a88e4a9a1a1a17572c8579143fc856667d9).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-155197372
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-154771670
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-155133513
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-155610925
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-155567266
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-155613207
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9530#discussion_r44465685
  
    --- Diff: core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala ---
    @@ -302,6 +325,138 @@ private[netty] class NettyRpcEnv(
         }
       }
     
    +  override def fileServer: RpcEnvFileServer = streamManager
    +
    +  override def openChannel(uri: String): ReadableByteChannel = {
    +    val parsedUri = new URI(uri)
    +    require(parsedUri.getHost() != null, "Host name must be defined.")
    +    require(parsedUri.getPort() > 0, "Port must be defined.")
    +    require(parsedUri.getPath() != null && parsedUri.getPath().nonEmpty, "Path must be defined.")
    +
    +    val pipe = Pipe.open()
    +    val source = new FileDownloadChannel(pipe.source())
    +    try {
    +      val client = fileDownloadClient(parsedUri.getHost(), parsedUri.getPort())
    +      val callback = new FileDownloadCallback(pipe.sink(), source, client)
    +      client.stream(parsedUri.getPath(), callback)
    +    } catch {
    +      case e: Exception =>
    +        pipe.sink().close()
    +        source.close()
    +        throw e
    +    }
    +
    +    source
    +  }
    +
    +  private def fileDownloadClient(host: String, port: Int): TransportClient = synchronized {
    +    if (stopped.get()) {
    +      throw new IllegalStateException("RpcEnv already stopped.")
    +    }
    +
    +    val address = RpcAddress(host, port)
    +    val client = fileClients.get(address).filter(_.isActive()).getOrElse(newDownloadClient(address))
    +
    +    // Tell the timeout handler this client is in use. This will prevent the handler from
    +    // closing the client if the timeout even triggers before data starts flowing for this
    +    // download.
    +    client.synchronized {
    +      val timeoutHandler = client.getChannel().pipeline().get(classOf[TimeoutHandler])
    +      timeoutHandler.setInUse(true)
    +
    +      // After notifying the timeout handler, check that the client is really active, and if not,
    +      // create a new one.
    +      if (client.isActive()) client else newDownloadClient(address)
    +    }
    +  }
    +
    +  /**
    +   * Create a new client and install a handler that will respond to IdleStateEvent. The events
    +   * are generated by the IdleStateHandler installed by the TransportContext when creating
    +   * clients, and the timeout value is controlled by the transport configuration.
    +   */
    +  private def newDownloadClient(addr: RpcAddress): TransportClient = {
    +    val c = clientFactory.createUnmanagedClient(addr.host, addr.port)
    +    c.getChannel().pipeline().addLast("rpcEnvTimeoutHandler", new TimeoutHandler(c))
    +    fileClients.put(addr, c)
    +    c
    +  }
    +
    +  private class TimeoutHandler(client: TransportClient) extends ChannelInboundHandlerAdapter {
    +
    +    @volatile private var inUse = true
    +
    +    def setInUse(inUse: Boolean): Unit = this.inUse = inUse
    +
    +    override def userEventTriggered(ctx: ChannelHandlerContext, evt: Object): Unit = {
    +      client.synchronized {
    +        if (!inUse && evt.isInstanceOf[IdleStateEvent] && client.isActive()) {
    +          logDebug(s"Closing transport client $client after idle timeout.")
    +          val socketAddr = client.getChannel().remoteAddress().asInstanceOf[InetSocketAddress]
    +          val address = RpcAddress(socketAddr.getHostName(), socketAddr.getPort())
    +          ctx.close()
    +          NettyRpcEnv.this.synchronized {
    --- End diff --
    
    Good catch, fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-155613209
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/45570/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9530#discussion_r44361532
  
    --- Diff: core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala ---
    @@ -302,6 +325,138 @@ private[netty] class NettyRpcEnv(
         }
       }
     
    +  override def fileServer: RpcEnvFileServer = streamManager
    +
    +  override def openChannel(uri: String): ReadableByteChannel = {
    +    val parsedUri = new URI(uri)
    +    require(parsedUri.getHost() != null, "Host name must be defined.")
    +    require(parsedUri.getPort() > 0, "Port must be defined.")
    +    require(parsedUri.getPath() != null && parsedUri.getPath().nonEmpty, "Path must be defined.")
    +
    +    val pipe = Pipe.open()
    +    val source = new FileDownloadChannel(pipe.source())
    +    try {
    +      val client = fileDownloadClient(parsedUri.getHost(), parsedUri.getPort())
    +      val callback = new FileDownloadCallback(pipe.sink(), source, client)
    +      client.stream(parsedUri.getPath(), callback)
    +    } catch {
    +      case e: Exception =>
    +        pipe.sink().close()
    +        source.close()
    +        throw e
    +    }
    +
    +    source
    +  }
    +
    +  private def fileDownloadClient(host: String, port: Int): TransportClient = synchronized {
    --- End diff --
    
    Oh, right. All tasks will download from the same driver.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-155610829
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/45552/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-155244901
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-155167098
  
    **[Test build #45402 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45402/consoleFull)** for PR 9530 at commit [`87b7a91`](https://github.com/apache/spark/commit/87b7a9174be76646ef814b42dc7d0172ccc23552).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-155610948
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-155201831
  
    **[Test build #45402 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45402/consoleFull)** for PR 9530 at commit [`87b7a91`](https://github.com/apache/spark/commit/87b7a9174be76646ef814b42dc7d0172ccc23552).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-155133542
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-154753105
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-154771659
  
    **[Test build #45293 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45293/consoleFull)** for PR 9530 at commit [`3f8209f`](https://github.com/apache/spark/commit/3f8209fe664836242ab849d4e8503c4443798556).
     * This patch **fails PySpark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9530#discussion_r44453574
  
    --- Diff: core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala ---
    @@ -302,6 +325,138 @@ private[netty] class NettyRpcEnv(
         }
       }
     
    +  override def fileServer: RpcEnvFileServer = streamManager
    +
    +  override def openChannel(uri: String): ReadableByteChannel = {
    +    val parsedUri = new URI(uri)
    +    require(parsedUri.getHost() != null, "Host name must be defined.")
    +    require(parsedUri.getPort() > 0, "Port must be defined.")
    +    require(parsedUri.getPath() != null && parsedUri.getPath().nonEmpty, "Path must be defined.")
    +
    +    val pipe = Pipe.open()
    +    val source = new FileDownloadChannel(pipe.source())
    +    try {
    +      val client = fileDownloadClient(parsedUri.getHost(), parsedUri.getPort())
    +      val callback = new FileDownloadCallback(pipe.sink(), source, client)
    +      client.stream(parsedUri.getPath(), callback)
    +    } catch {
    +      case e: Exception =>
    +        pipe.sink().close()
    +        source.close()
    +        throw e
    +    }
    +
    +    source
    +  }
    +
    +  private def fileDownloadClient(host: String, port: Int): TransportClient = synchronized {
    --- End diff --
    
    On second thought I think that's fine since downloading are still in parallel.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-155574831
  
    **[Test build #45540 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45540/consoleFull)** for PR 9530 at commit [`a89e665`](https://github.com/apache/spark/commit/a89e6655e74209d9b89f5dcb2b1b8ed8e3399d42).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-155576989
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-155166790
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-155246626
  
    **[Test build #45446 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45446/consoleFull)** for PR 9530 at commit [`a222c03`](https://github.com/apache/spark/commit/a222c0344738ec1bd19e6cffea83d11d1b0ad2fc).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9530#discussion_r44360229
  
    --- Diff: core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala ---
    @@ -302,6 +325,138 @@ private[netty] class NettyRpcEnv(
         }
       }
     
    +  override def fileServer: RpcEnvFileServer = streamManager
    +
    +  override def openChannel(uri: String): ReadableByteChannel = {
    +    val parsedUri = new URI(uri)
    +    require(parsedUri.getHost() != null, "Host name must be defined.")
    +    require(parsedUri.getPort() > 0, "Port must be defined.")
    +    require(parsedUri.getPath() != null && parsedUri.getPath().nonEmpty, "Path must be defined.")
    +
    +    val pipe = Pipe.open()
    +    val source = new FileDownloadChannel(pipe.source())
    +    try {
    +      val client = fileDownloadClient(parsedUri.getHost(), parsedUri.getPort())
    +      val callback = new FileDownloadCallback(pipe.sink(), source, client)
    +      client.stream(parsedUri.getPath(), callback)
    +    } catch {
    +      case e: Exception =>
    +        pipe.sink().close()
    +        source.close()
    +        throw e
    +    }
    +
    +    source
    +  }
    +
    +  private def fileDownloadClient(host: String, port: Int): TransportClient = synchronized {
    --- End diff --
    
    So if dowloading from different hosts (different tasks in the same Executor), they need to connect one by one. Right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-155269548
  
    **[Test build #45446 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45446/consoleFull)** for PR 9530 at commit [`a222c03`](https://github.com/apache/spark/commit/a222c0344738ec1bd19e6cffea83d11d1b0ad2fc).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-155219305
  
    retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-154752998
  
    retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-155541099
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-154577701
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-155541164
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-154768475
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-155223319
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-155269631
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/45446/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-155219970
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-155134307
  
    **[Test build #45383 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45383/consoleFull)** for PR 9530 at commit [`87b7a91`](https://github.com/apache/spark/commit/87b7a9174be76646ef814b42dc7d0172ccc23552).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-155610828
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-155624877
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/45547/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-155567361
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-154764761
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-156601037
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-155624875
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-155203661
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-154574543
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-154768480
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-156583779
  
    **[Test build #45904 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45904/consoleFull)** for PR 9530 at commit [`908a051`](https://github.com/apache/spark/commit/908a05121da0a5957845d4451e3841e6faa71493).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-154768450
  
    retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9530#discussion_r44350911
  
    --- Diff: core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala ---
    @@ -501,13 +625,15 @@ private[netty] class NettyRpcHandler(
         dispatcher.postRemoteMessage(messageToDispatch, callback)
       }
     
    -  override def getStreamManager: StreamManager = new OneForOneStreamManager
    +  override def getStreamManager: StreamManager = streamManager
     
       override def exceptionCaught(cause: Throwable, client: TransportClient): Unit = {
         val addr = client.getChannel.remoteAddress().asInstanceOf[InetSocketAddress]
         if (addr != null) {
    -      val clientAddr = RpcAddress(addr.getHostName, addr.getPort)
    --- End diff --
    
    It's explained in the following comment:
    https://github.com/apache/spark/pull/9530/files#diff-0c89b4a60c30a7cd2224bb64d93da942R85


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-155576963
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9530#discussion_r44455097
  
    --- Diff: core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala ---
    @@ -302,6 +325,138 @@ private[netty] class NettyRpcEnv(
         }
       }
     
    +  override def fileServer: RpcEnvFileServer = streamManager
    +
    +  override def openChannel(uri: String): ReadableByteChannel = {
    +    val parsedUri = new URI(uri)
    +    require(parsedUri.getHost() != null, "Host name must be defined.")
    +    require(parsedUri.getPort() > 0, "Port must be defined.")
    +    require(parsedUri.getPath() != null && parsedUri.getPath().nonEmpty, "Path must be defined.")
    +
    +    val pipe = Pipe.open()
    +    val source = new FileDownloadChannel(pipe.source())
    +    try {
    +      val client = fileDownloadClient(parsedUri.getHost(), parsedUri.getPort())
    +      val callback = new FileDownloadCallback(pipe.sink(), source, client)
    +      client.stream(parsedUri.getPath(), callback)
    +    } catch {
    +      case e: Exception =>
    +        pipe.sink().close()
    +        source.close()
    +        throw e
    +    }
    +
    +    source
    +  }
    +
    +  private def fileDownloadClient(host: String, port: Int): TransportClient = synchronized {
    +    if (stopped.get()) {
    +      throw new IllegalStateException("RpcEnv already stopped.")
    +    }
    +
    +    val address = RpcAddress(host, port)
    +    val client = fileClients.get(address).filter(_.isActive()).getOrElse(newDownloadClient(address))
    +
    +    // Tell the timeout handler this client is in use. This will prevent the handler from
    +    // closing the client if the timeout even triggers before data starts flowing for this
    +    // download.
    +    client.synchronized {
    +      val timeoutHandler = client.getChannel().pipeline().get(classOf[TimeoutHandler])
    +      timeoutHandler.setInUse(true)
    +
    +      // After notifying the timeout handler, check that the client is really active, and if not,
    +      // create a new one.
    +      if (client.isActive()) client else newDownloadClient(address)
    +    }
    +  }
    +
    +  /**
    +   * Create a new client and install a handler that will respond to IdleStateEvent. The events
    +   * are generated by the IdleStateHandler installed by the TransportContext when creating
    +   * clients, and the timeout value is controlled by the transport configuration.
    +   */
    +  private def newDownloadClient(addr: RpcAddress): TransportClient = {
    +    val c = clientFactory.createUnmanagedClient(addr.host, addr.port)
    +    c.getChannel().pipeline().addLast("rpcEnvTimeoutHandler", new TimeoutHandler(c))
    +    fileClients.put(addr, c)
    +    c
    +  }
    +
    +  private class TimeoutHandler(client: TransportClient) extends ChannelInboundHandlerAdapter {
    +
    +    @volatile private var inUse = true
    +
    +    def setInUse(inUse: Boolean): Unit = this.inUse = inUse
    +
    +    override def userEventTriggered(ctx: ChannelHandlerContext, evt: Object): Unit = {
    +      client.synchronized {
    +        if (!inUse && evt.isInstanceOf[IdleStateEvent] && client.isActive()) {
    +          logDebug(s"Closing transport client $client after idle timeout.")
    +          val socketAddr = client.getChannel().remoteAddress().asInstanceOf[InetSocketAddress]
    +          val address = RpcAddress(socketAddr.getHostName(), socketAddr.getPort())
    +          ctx.close()
    +          NettyRpcEnv.this.synchronized {
    --- End diff --
    
    The lock order here is in reverse order of `fileDownloadClient`. A potential dead-lock issue?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9530#discussion_r44362944
  
    --- Diff: core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala ---
    @@ -302,6 +325,138 @@ private[netty] class NettyRpcEnv(
         }
       }
     
    +  override def fileServer: RpcEnvFileServer = streamManager
    +
    +  override def openChannel(uri: String): ReadableByteChannel = {
    +    val parsedUri = new URI(uri)
    +    require(parsedUri.getHost() != null, "Host name must be defined.")
    +    require(parsedUri.getPort() > 0, "Port must be defined.")
    +    require(parsedUri.getPath() != null && parsedUri.getPath().nonEmpty, "Path must be defined.")
    +
    +    val pipe = Pipe.open()
    +    val source = new FileDownloadChannel(pipe.source())
    +    try {
    +      val client = fileDownloadClient(parsedUri.getHost(), parsedUri.getPort())
    +      val callback = new FileDownloadCallback(pipe.sink(), source, client)
    +      client.stream(parsedUri.getPath(), callback)
    +    } catch {
    +      case e: Exception =>
    +        pipe.sink().close()
    +        source.close()
    +        throw e
    +    }
    +
    +    source
    +  }
    +
    +  private def fileDownloadClient(host: String, port: Int): TransportClient = synchronized {
    --- End diff --
    
    That's fine. What's your worry here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-155244922
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-154574561
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9530#discussion_r44352760
  
    --- Diff: core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala ---
    @@ -302,6 +325,105 @@ private[netty] class NettyRpcEnv(
         }
       }
     
    +  override def fileServer: RpcEnvFileServer = streamManager
    +
    +  override def openChannel(uri: String): ReadableByteChannel = {
    +    val parsedUri = new URI(uri)
    +    require(parsedUri.getHost() != null, "Host name must be defined.")
    +    require(parsedUri.getPort() > 0, "Port must be defined.")
    +    require(parsedUri.getPath() != null && parsedUri.getPath().nonEmpty, "Path must be defined.")
    +
    +    val pipe = Pipe.open()
    +    val source = new FileDownloadChannel(pipe.source())
    +    try {
    +      val callback = new FileDownloadCallback(pipe.sink(), source)
    +      val client = fileDownloadClient(parsedUri.getHost(), parsedUri.getPort())
    +      client.stream(parsedUri.getPath(), callback)
    +    } catch {
    +      case e: Exception =>
    +        pipe.sink().close()
    +        source.close()
    +        throw e
    +    }
    +
    +    source
    +  }
    +
    +  private def fileDownloadClient(host: String, port: Int): TransportClient = synchronized {
    +    if (stopped.get()) {
    +      throw new IllegalStateException("RpcEnv already stopped.")
    +    }
    +
    +    val address = RpcAddress(host, port)
    +    fileClients.get(address).filter(_.isActive()).getOrElse {
    +      // Create a new client and install a handler that will respond to IdleStateEvent. The events
    +      // are generated by the IdleStateHandler installed by the TransportContext when creating
    +      // clients, and the timeout value is controlled by the transport configuration.
    +      val c = clientFactory.createUnmanagedClient(host, port)
    +      c.getChannel().pipeline().addLast("rpcEnvTimeoutHandler", new TimeoutHandler(c))
    +      fileClients.put(address, c)
    +      c
    +    }
    +  }
    +
    +  private class TimeoutHandler(client: TransportClient) extends ChannelInboundHandlerAdapter {
    +
    +    override def userEventTriggered(ctx: ChannelHandlerContext, evt: Object): Unit = {
    --- End diff --
    
    So, this is a little bit racy, since the timeout might trigger when another thread is preparing to download a file. I'll fix this and update the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9530#discussion_r44349532
  
    --- Diff: core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala ---
    @@ -501,13 +625,15 @@ private[netty] class NettyRpcHandler(
         dispatcher.postRemoteMessage(messageToDispatch, callback)
       }
     
    -  override def getStreamManager: StreamManager = new OneForOneStreamManager
    +  override def getStreamManager: StreamManager = streamManager
     
       override def exceptionCaught(cause: Throwable, client: TransportClient): Unit = {
         val addr = client.getChannel.remoteAddress().asInstanceOf[InetSocketAddress]
         if (addr != null) {
    -      val clientAddr = RpcAddress(addr.getHostName, addr.getPort)
    --- End diff --
    
    Sorry, I don't follow. The line was moved inside the new conditions, nothing's really changing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-155624815
  
    **[Test build #45547 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45547/consoleFull)** for PR 9530 at commit [`71ac0cf`](https://github.com/apache/spark/commit/71ac0cfdb25d7bb29e180c704a8eac840c7fccd4).
     * This patch **fails from timeout after a configured wait of \`250m\`**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-155219986
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-154764742
  
    **[Test build #45292 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45292/consoleFull)** for PR 9530 at commit [`3f8209f`](https://github.com/apache/spark/commit/3f8209fe664836242ab849d4e8503c4443798556).
     * This patch **fails PySpark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-156601001
  
    **[Test build #45904 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45904/consoleFull)** for PR 9530 at commit [`908a051`](https://github.com/apache/spark/commit/908a05121da0a5957845d4451e3841e6faa71493).
     * This patch **fails PySpark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:\n  * `case class AppendColumns[T, U](`\n


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-155574985
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-155236827
  
    LGTM. I noticed that `Utils.fetchFile` cannot handle special chars (e.g., `%`, ` `) in file name. But since the http server has the same issue, it should not block this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-154753111
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-155542223
  
    **[Test build #45540 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45540/consoleFull)** for PR 9530 at commit [`a89e665`](https://github.com/apache/spark/commit/a89e6655e74209d9b89f5dcb2b1b8ed8e3399d42).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-154768711
  
    **[Test build #45293 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45293/consoleFull)** for PR 9530 at commit [`3f8209f`](https://github.com/apache/spark/commit/3f8209fe664836242ab849d4e8503c4443798556).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/9530#issuecomment-154575961
  
    /cc @rxin @zsxwing 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11140] [core] Transfer files using netw...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9530#discussion_r44351354
  
    --- Diff: core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala ---
    @@ -501,13 +625,15 @@ private[netty] class NettyRpcHandler(
         dispatcher.postRemoteMessage(messageToDispatch, callback)
       }
     
    -  override def getStreamManager: StreamManager = new OneForOneStreamManager
    +  override def getStreamManager: StreamManager = streamManager
     
       override def exceptionCaught(cause: Throwable, client: TransportClient): Unit = {
         val addr = client.getChannel.remoteAddress().asInstanceOf[InetSocketAddress]
         if (addr != null) {
    -      val clientAddr = RpcAddress(addr.getHostName, addr.getPort)
    --- End diff --
    
    Got it. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org