You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by rxin <gi...@git.apache.org> on 2014/09/03 00:26:03 UTC

[GitHub] spark pull request: [SPARK-3019] Pluggable block transfer interfac...

GitHub user rxin opened a pull request:

    https://github.com/apache/spark/pull/2240

    [SPARK-3019] Pluggable block transfer interface (BlockTransferService)

    This pull request creates a new BlockTransferService interface for block fetch/upload.
    
    The pull request also refactors the existing ConnectionManager to implement BlockTransferService (CMBlockTransferService). 
    
    TODOs that should be part of this PR:
    - [ ] Bring this up to date with master
    - [ ] Look at the removed unit tests and see if they still make sense to be added back
    
    TODOs that should be separate PRs:
    - [ ] Implement NettyBlockTransferService
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/rxin/spark blockTransferService

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/2240.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2240
    
----
commit 9ef279ccfc6b8fedf4e3eb24b9cd7b5bc4ce7424
Author: Reynold Xin <rx...@apache.org>
Date:   2014-08-28T21:20:12Z

    Initial refactoring to move ConnectionManager to use the BlockTransferService.

commit d8d595c161fd215c3b2e39f5130df94077880fd3
Author: Reynold Xin <rx...@apache.org>
Date:   2014-08-28T21:21:57Z

    Merge branch 'master' of github.com:apache/spark into blockTransferService
    
    Conflicts:
    	core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
    	core/src/main/scala/org/apache/spark/storage/BlockManager.scala
    	core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala

commit ae05fcd47b52f0da3db669d74888f3cc0780f33b
Author: Reynold Xin <rx...@apache.org>
Date:   2014-09-02T08:06:26Z

    Updated tests, although DistributedSuite is hanging.

commit 98c668ae98e6e7d3d22504b3607527ad162356fc
Author: Reynold Xin <rx...@apache.org>
Date:   2014-09-02T22:03:58Z

    Added failure handling and fixed unit tests.

commit 07ccf0db4c250c0160e42cf6efb030e7502d30e7
Author: Reynold Xin <rx...@apache.org>
Date:   2014-09-02T22:12:54Z

    Added init check to CMBlockTransferService.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3019] Pluggable block transfer interfac...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2240#issuecomment-54259809
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19644/consoleFull) for   PR 2240 at commit [`1332156`](https://github.com/apache/spark/commit/13321569b169b173444e5c8a4aab2975ebc3244d).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3019] Pluggable block transfer interfac...

Posted by shivaram <gi...@git.apache.org>.
Github user shivaram commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2240#discussion_r17029823
  
    --- Diff: core/src/main/scala/org/apache/spark/network/BlockTransferService.scala ---
    @@ -0,0 +1,114 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network
    +
    +import org.apache.spark.storage.StorageLevel
    +
    +
    +abstract class BlockTransferService {
    +
    +  /**
    +   * Initialize the transfer service by giving it the BlockDataManager that can be used to fetch
    +   * local blocks or put local blocks.
    +   */
    +  def init(blockDataManager: BlockDataManager)
    +
    +  /**
    +   * Tear down the transfer service.
    +   */
    +  def stop(): Unit
    +
    +  /**
    +   * Port number the service is listening on, available only after [[init]] is invoked.
    +   */
    +  def port: Int
    +
    +  /**
    +   * Host name the service is listening on, available only after [[init]] is invoked.
    +   */
    +  def hostName: String
    +
    +  /**
    +   * Fetch a sequence of blocks from a remote node asynchronously,
    +   * available only after [[init]] is invoked.
    +   *
    +   * Note that [[BlockFetchingListener.onBlockFetchSuccess]] is called once per block,
    +   * while [[BlockFetchingListener.onBlockFetchSuccess]] is called once per failure.
    +   *
    +   * This takes a sequence so the implementation can batch requests.
    +   */
    +  def fetchBlocks(
    +      hostName: String,
    +      port: Int,
    +      blockIds: Seq[String],
    +      listener: BlockFetchingListener): Unit
    +
    +  /**
    +   * Fetch a single block from a remote node, synchronously,
    +   * available only after [[init]] is invoked.
    +   */
    +  def fetchBlock(hostName: String, port: Int, blockId: String): ManagedBuffer = {
    --- End diff --
    
    Is this implemented here just for ease of testing ? I mean why do we need this single block fetch + blocking listener as a special case ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3019] Pluggable block transfer interfac...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/2240


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3019] Pluggable block transfer interfac...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2240#issuecomment-54259036
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19641/consoleFull) for   PR 2240 at commit [`2960c93`](https://github.com/apache/spark/commit/2960c93a85db42bb6a018e28c44cde2aed73f3d6).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3019] Pluggable block transfer interfac...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2240#discussion_r17031089
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala ---
    @@ -0,0 +1,267 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.storage
    +
    +import java.util.concurrent.LinkedBlockingQueue
    +
    +import scala.collection.mutable.ArrayBuffer
    +import scala.collection.mutable.HashSet
    +import scala.collection.mutable.Queue
    +
    +import org.apache.spark.{TaskContext, Logging, SparkException}
    +import org.apache.spark.network.{ManagedBuffer, BlockFetchingListener, BlockTransferService}
    +import org.apache.spark.serializer.Serializer
    +import org.apache.spark.util.Utils
    +
    +
    +/**
    + * An iterator that fetches multiple blocks. For local blocks, it fetches from the local block
    + * manager. For remote blocks, it fetches them using the provided BlockTransferService.
    + *
    + * This creates an iterator of (BlockID, values) tuples so the caller can handle blocks in a
    + * pipelined fashion as they are received.
    + *
    + * The implementation throttles the remote fetches to they don't exceed maxBytesInFlight to avoid
    + * using too much memory.
    + *
    + * @param context
    + * @param blockManager
    + * @param blocksByAddress
    + * @param serializer
    + * @param maxBytesInFlight max size (in bytes) of remote blocks to fetch at any given point.
    + */
    +private[spark]
    +final class ShuffleBlockFetcherIterator(
    +    context: TaskContext,
    +    blockTransferService: BlockTransferService,
    +    blockManager: BlockManager,
    +    blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])],
    +    serializer: Serializer,
    +    maxBytesInFlight: Long)
    +  extends Iterator[(BlockId, Option[Iterator[Any]])] with Logging {
    +
    +  import ShuffleBlockFetcherIterator._
    +
    +  /**
    +   * Total number of blocks to fetch. This can be smaller than the total number of blocks
    +   * in [[blocksByAddress]] because we filter out zero-sized blocks in [[initialize]].
    +   *
    +   * This should equal localBlocks.size + remoteBlocks.size.
    +   */
    +  private[this] var numBlocksToFetch = 0
    +
    +  /**
    +   * The number of blocks proccessed by the caller. The iterator is exhausted when
    +   * [[numBlocksProcessed]] == [[numBlocksToFetch]].
    +   */
    +  private[this] var numBlocksProcessed = 0
    +
    +  private[this] val startTime = System.currentTimeMillis
    +
    +  /** Local blocks to fetch, excluding zero-sized blocks. */
    +  private[this] val localBlocks = new ArrayBuffer[BlockId]()
    +
    +  /** Remote blocks to fetch, excluding zero-sized blocks. */
    +  private[this] val remoteBlocks = new HashSet[BlockId]()
    +
    +  /**
    +   * A queue to hold our results. This turns the asynchronous model provided by
    +   * [[BlockTransferService]] into a synchronous model (iterator).
    +   */
    +  private[this] val results = new LinkedBlockingQueue[FetchResult]
    +
    +  // Queue of fetch requests to issue; we'll pull requests off this gradually to make sure that
    +  // the number of bytes in flight is limited to maxBytesInFlight
    +  private[this] val fetchRequests = new Queue[FetchRequest]
    +
    +  // Current bytes in flight from our requests
    +  private[this] var bytesInFlight = 0L
    +
    +  private[this] val shuffleMetrics = context.taskMetrics.createShuffleReadMetricsForDependency()
    +
    +  initialize()
    +
    +  private[this] def sendRequest(req: FetchRequest) {
    --- End diff --
    
    note to reviewers: this is the function that changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3019] Pluggable block transfer interfac...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2240#discussion_r17029359
  
    --- Diff: core/src/main/scala/org/apache/spark/network/BlockFetchingListener.scala ---
    @@ -0,0 +1,37 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network
    +
    +import java.util.EventListener
    +
    +
    --- End diff --
    
    Both are ok actually. I usually do 2 myself. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3019] Pluggable block transfer interfac...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2240#discussion_r17262643
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala ---
    @@ -166,34 +167,30 @@ class FileShuffleBlockManager(conf: SparkConf)
         }
       }
     
    -  /**
    -   * Returns the physical file segment in which the given BlockId is located.
    -   */
    -  private def getBlockLocation(id: ShuffleBlockId): FileSegment = {
    +  override def getBytes(blockId: ShuffleBlockId): Option[ByteBuffer] = {
    +    val segment = getBlockData(blockId)
    +    Some(segment.nioByteBuffer())
    +  }
    +
    +  override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
    --- End diff --
    
    Note to reviewer: This is changed to return a ManagedBuffer, which can return an input stream directly from the underlying file rather than copying the entire underlying content to a bytebuffer and create an input stream from that bytebuffer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3019] Pluggable block transfer interfac...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2240#discussion_r17032386
  
    --- Diff: core/src/main/scala/org/apache/spark/SparkEnv.scala ---
    @@ -223,14 +226,14 @@ object SparkEnv extends Logging {
     
         val shuffleMemoryManager = new ShuffleMemoryManager(conf)
     
    +    val blockTransferService = new CMBlockTransferService(conf, securityManager)
    +
    --- End diff --
    
    yes


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3019] Pluggable block transfer interfac...

Posted by colorant <gi...@git.apache.org>.
Github user colorant commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2240#discussion_r17028474
  
    --- Diff: core/src/main/scala/org/apache/spark/network/BlockFetchingListener.scala ---
    @@ -0,0 +1,37 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network
    +
    +import java.util.EventListener
    +
    +
    --- End diff --
    
    so the convention is one or two empty line after import?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3019] Pluggable block transfer interfac...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2240#discussion_r17031085
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala ---
    @@ -0,0 +1,267 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.storage
    +
    +import java.util.concurrent.LinkedBlockingQueue
    +
    +import scala.collection.mutable.ArrayBuffer
    +import scala.collection.mutable.HashSet
    +import scala.collection.mutable.Queue
    +
    +import org.apache.spark.{TaskContext, Logging, SparkException}
    +import org.apache.spark.network.{ManagedBuffer, BlockFetchingListener, BlockTransferService}
    +import org.apache.spark.serializer.Serializer
    +import org.apache.spark.util.Utils
    +
    +
    +/**
    + * An iterator that fetches multiple blocks. For local blocks, it fetches from the local block
    + * manager. For remote blocks, it fetches them using the provided BlockTransferService.
    + *
    + * This creates an iterator of (BlockID, values) tuples so the caller can handle blocks in a
    + * pipelined fashion as they are received.
    + *
    + * The implementation throttles the remote fetches to they don't exceed maxBytesInFlight to avoid
    + * using too much memory.
    + *
    + * @param context
    + * @param blockManager
    + * @param blocksByAddress
    + * @param serializer
    + * @param maxBytesInFlight max size (in bytes) of remote blocks to fetch at any given point.
    + */
    +private[spark]
    +final class ShuffleBlockFetcherIterator(
    +    context: TaskContext,
    +    blockTransferService: BlockTransferService,
    +    blockManager: BlockManager,
    +    blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])],
    --- End diff --
    
    note to self: explain this


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3019] Pluggable block transfer interfac...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2240#discussion_r17031806
  
    --- Diff: core/src/main/scala/org/apache/spark/network/BlockTransferService.scala ---
    @@ -0,0 +1,114 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network
    +
    +import org.apache.spark.storage.StorageLevel
    +
    +
    +abstract class BlockTransferService {
    +
    +  /**
    +   * Initialize the transfer service by giving it the BlockDataManager that can be used to fetch
    +   * local blocks or put local blocks.
    +   */
    +  def init(blockDataManager: BlockDataManager)
    +
    +  /**
    +   * Tear down the transfer service.
    +   */
    +  def stop(): Unit
    +
    +  /**
    +   * Port number the service is listening on, available only after [[init]] is invoked.
    +   */
    +  def port: Int
    +
    +  /**
    +   * Host name the service is listening on, available only after [[init]] is invoked.
    +   */
    +  def hostName: String
    +
    +  /**
    +   * Fetch a sequence of blocks from a remote node asynchronously,
    +   * available only after [[init]] is invoked.
    +   *
    +   * Note that [[BlockFetchingListener.onBlockFetchSuccess]] is called once per block,
    +   * while [[BlockFetchingListener.onBlockFetchSuccess]] is called once per failure.
    --- End diff --
    
    yes - going to update it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3019] Pluggable block transfer interfac...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2240#discussion_r17029371
  
    --- Diff: core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala ---
    @@ -0,0 +1,101 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network
    +
    +import java.io.{RandomAccessFile, File, FileInputStream, InputStream}
    +import java.nio.ByteBuffer
    +import java.nio.channels.FileChannel.MapMode
    +
    +import io.netty.buffer.{ByteBufInputStream, ByteBuf, Unpooled}
    +import io.netty.channel.DefaultFileRegion
    +
    +import org.apache.spark.storage.FileSegment
    +import org.apache.spark.util.ByteBufferInputStream
    +
    +
    +/**
    + * Provides a buffer abstraction that allows pooling and reuse.
    + */
    +abstract class ManagedBuffer {
    +  // Note that all the methods are defined with parenthesis because their implementations can
    +  // have side effects (io operations).
    +
    +  def byteBuffer(): ByteBuffer
    +
    +  def fileSegment(): Option[FileSegment] = None
    +
    +  def inputStream(): InputStream = throw new UnsupportedOperationException
    +
    +  def release(): Unit = throw new UnsupportedOperationException
    +
    +  def size: Long
    +
    +  private[network] def toNetty(): AnyRef
    +}
    +
    +
    +/**
    + * A ManagedBuffer backed by a segment in a file.
    + */
    +final class FileSegmentManagedBuffer(file: File, offset: Long, length: Long)
    +  extends ManagedBuffer {
    +
    +  override def size: Long = length
    +
    +  override def byteBuffer(): ByteBuffer = {
    +    val channel = new RandomAccessFile(file, "r").getChannel
    --- End diff --
    
    The problem is conf propagation. I will add a todo to enable that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3019] Pluggable block transfer interfac...

Posted by shivaram <gi...@git.apache.org>.
Github user shivaram commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2240#discussion_r17029890
  
    --- Diff: core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala ---
    @@ -0,0 +1,101 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network
    +
    +import java.io.{RandomAccessFile, File, FileInputStream, InputStream}
    +import java.nio.ByteBuffer
    +import java.nio.channels.FileChannel.MapMode
    +
    +import io.netty.buffer.{ByteBufInputStream, ByteBuf, Unpooled}
    +import io.netty.channel.DefaultFileRegion
    +
    +import org.apache.spark.storage.FileSegment
    +import org.apache.spark.util.ByteBufferInputStream
    +
    +
    +/**
    + * Provides a buffer abstraction that allows pooling and reuse.
    + */
    +abstract class ManagedBuffer {
    +  // Note that all the methods are defined with parenthesis because their implementations can
    +  // have side effects (io operations).
    +
    +  def byteBuffer(): ByteBuffer
    +
    +  def fileSegment(): Option[FileSegment] = None
    +
    +  def inputStream(): InputStream = throw new UnsupportedOperationException
    +
    +  def release(): Unit = throw new UnsupportedOperationException
    +
    +  def size: Long
    +
    +  private[network] def toNetty(): AnyRef
    --- End diff --
    
    toNetty just sounds like a weird thing to have in a general class like ManagedBuffer. Can we hide this behind something like NettyManagedBuffer ? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3019] Pluggable block transfer interfac...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2240#discussion_r17031070
  
    --- Diff: core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala ---
    @@ -0,0 +1,101 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network
    +
    +import java.io.{RandomAccessFile, File, FileInputStream, InputStream}
    +import java.nio.ByteBuffer
    +import java.nio.channels.FileChannel.MapMode
    +
    +import io.netty.buffer.{ByteBufInputStream, ByteBuf, Unpooled}
    +import io.netty.channel.DefaultFileRegion
    +
    +import org.apache.spark.storage.FileSegment
    +import org.apache.spark.util.ByteBufferInputStream
    +
    +
    +/**
    + * Provides a buffer abstraction that allows pooling and reuse.
    + */
    +abstract class ManagedBuffer {
    +  // Note that all the methods are defined with parenthesis because their implementations can
    +  // have side effects (io operations).
    +
    +  def byteBuffer(): ByteBuffer
    +
    +  def fileSegment(): Option[FileSegment] = None
    +
    +  def inputStream(): InputStream = throw new UnsupportedOperationException
    +
    +  def release(): Unit = throw new UnsupportedOperationException
    +
    +  def size: Long
    +
    +  private[network] def toNetty(): AnyRef
    --- End diff --
    
    toNetty actually applies to all managed buffers. We can move this into the netty package, but in that case we lose the compile time check because somebody could add a new implementation of the ManagedBuffer and forget to update the netty part of the code. 
    
    That said, I will add some inline doc to explain the purpose of this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3019] Pluggable block transfer interfac...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2240#issuecomment-54255668
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19639/consoleFull) for   PR 2240 at commit [`e29c721`](https://github.com/apache/spark/commit/e29c721132fcee79d65d4b6e30dd4ee46a814ef7).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3019] Pluggable block transfer interfac...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/2240#issuecomment-54238121
  
    @colorant can you take a look at this since it interfaces with some of your changes?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3019] Pluggable block transfer interfac...

Posted by colorant <gi...@git.apache.org>.
Github user colorant commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2240#discussion_r17032365
  
    --- Diff: core/src/main/scala/org/apache/spark/SparkEnv.scala ---
    @@ -223,14 +226,14 @@ object SparkEnv extends Logging {
     
         val shuffleMemoryManager = new ShuffleMemoryManager(conf)
     
    +    val blockTransferService = new CMBlockTransferService(conf, securityManager)
    +
    --- End diff --
    
    So, some conf will be add here for specific BlockTransferService selection later?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3019] Pluggable block transfer interfac...

Posted by shivaram <gi...@git.apache.org>.
Github user shivaram commented on the pull request:

    https://github.com/apache/spark/pull/2240#issuecomment-54248714
  
    I took a very brief look -- Will take a closer look when I find some more time. But here are a couple of high-level comments
    
    1. It would be better IMHO to split this into 2 PRs, one just for renaming / moving classes and the other for adding new stuff. That would make reviewing much easier (for example I don't know if any code changed in ShuffleBlockFetchIterator right now)
    
    2. I am not a big fan of the package name `cm`. I know `connectionmanager` is a mouthful, but my suggestion would actually be to keep everything in `network` till we figure out how to split things into sub-packages. I think ideally we'd like common traits / abstract classes to be in network/ and then have `netty` and `nio` etc. as sub-packages (the connection manager uses nio right ?)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3019] Pluggable block transfer interfac...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/2240#issuecomment-54877023
  
    Jenkins, test this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3019] Pluggable block transfer interfac...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2240#discussion_r17032847
  
    --- Diff: core/src/main/scala/org/apache/spark/SparkEnv.scala ---
    @@ -223,14 +226,14 @@ object SparkEnv extends Logging {
     
         val shuffleMemoryManager = new ShuffleMemoryManager(conf)
     
    +    val blockTransferService = new CMBlockTransferService(conf, securityManager)
    +
    --- End diff --
    
    We can have it in init or the SparkEnv way (which is not ideal since I want to minimize dependency on SparkEnv). We can worry about that in my next PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3019] Pluggable block transfer interfac...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2240#issuecomment-54242064
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19613/consoleFull) for   PR 2240 at commit [`2a907e4`](https://github.com/apache/spark/commit/2a907e4bb00d579ae9b1f4724ba2a58200a9596d).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3019] Pluggable block transfer interfac...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2240#discussion_r17032690
  
    --- Diff: core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala ---
    @@ -0,0 +1,205 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network.nio
    +
    +import java.nio.ByteBuffer
    +
    +import scala.concurrent.Future
    +
    +import org.apache.spark.{SparkException, Logging, SecurityManager, SparkConf}
    +import org.apache.spark.network._
    +import org.apache.spark.storage.{BlockId, StorageLevel}
    +import org.apache.spark.util.Utils
    +
    +
    +/**
    + * A [[BlockTransferService]] implementation based on [[ConnectionManager]], a custom
    + * implementation using Java NIO.
    + */
    +final class NioBlockTransferService(conf: SparkConf, securityManager: SecurityManager)
    +  extends BlockTransferService with Logging {
    +
    +  private var cm: ConnectionManager = _
    +
    +  private var blockDataManager: BlockDataManager = _
    +
    +  /**
    +   * Port number the service is listening on, available only after [[init]] is invoked.
    +   */
    +  override def port: Int = {
    +    checkInit()
    +    cm.id.port
    +  }
    +
    +  /**
    +   * Host name the service is listening on, available only after [[init]] is invoked.
    +   */
    +  override def hostName: String = {
    +    checkInit()
    +    cm.id.host
    +  }
    +
    +  /**
    +   * Initialize the transfer service by giving it the BlockDataManager that can be used to fetch
    +   * local blocks or put local blocks.
    +   */
    +  override def init(blockDataManager: BlockDataManager): Unit = {
    +    this.blockDataManager = blockDataManager
    +    cm = new ConnectionManager(
    +      conf.getInt("spark.blockManager.port", 0),
    +      conf,
    +      securityManager,
    +      "Connection manager for block manager")
    +    cm.onReceiveMessage(onBlockMessageReceive)
    +  }
    +
    +  /**
    +   * Tear down the transfer service.
    +   */
    +  override def stop(): Unit = {
    +    if (cm != null) {
    +      cm.stop()
    +    }
    +  }
    +
    +  override def fetchBlocks(
    +      hostName: String,
    +      port: Int,
    +      blockIds: Seq[String],
    +      listener: BlockFetchingListener): Unit = {
    +    checkInit()
    +
    +    val cmId = new ConnectionManagerId(hostName, port)
    +    val blockMessageArray = new BlockMessageArray(blockIds.map { blockId =>
    +      BlockMessage.fromGetBlock(GetBlock(BlockId(blockId)))
    +    })
    +
    +    val future = cm.sendMessageReliably(cmId, blockMessageArray.toBufferMessage)
    +
    +    // Register the listener on success/failure future callback.
    +    future.onSuccess { case message =>
    +      val bufferMessage = message.asInstanceOf[BufferMessage]
    +      val blockMessageArray = BlockMessageArray.fromBufferMessage(bufferMessage)
    +
    +      for (blockMessage <- blockMessageArray) {
    +        if (blockMessage.getType != BlockMessage.TYPE_GOT_BLOCK) {
    +          listener.onBlockFetchFailure(
    +            new SparkException(s"Unexpected message ${blockMessage.getType} received from $cmId"))
    +        } else {
    +          val blockId = blockMessage.getId
    +          val networkSize = blockMessage.getData.limit()
    +          listener.onBlockFetchSuccess(
    +            blockId.toString, new NioByteBufferManagedBuffer(blockMessage.getData))
    +        }
    +      }
    +    }(cm.futureExecContext)
    +
    +    future.onFailure { case exception =>
    +      listener.onBlockFetchFailure(exception)
    +    }(cm.futureExecContext)
    +  }
    +
    +  /**
    +   * Upload a single block to a remote node, available only after [[init]] is invoked.
    +   *
    +   * This call blocks until the upload completes, or throws an exception upon failures.
    +   */
    +  override def uploadBlock(
    +      hostname: String,
    +      port: Int,
    +      blockId: String,
    +      blockData: ManagedBuffer,
    +      level: StorageLevel)
    +    : Future[Unit] = {
    +    checkInit()
    +    val msg = PutBlock(BlockId(blockId), blockData.nioByteBuffer(), level)
    +    val blockMessageArray = new BlockMessageArray(BlockMessage.fromPutBlock(msg))
    +    val remoteCmId = new ConnectionManagerId(hostName, port)
    +    val reply = cm.sendMessageReliably(remoteCmId, blockMessageArray.toBufferMessage)
    +    reply.map(x => ())(cm.futureExecContext)
    +  }
    +
    +  private def checkInit(): Unit = if (cm == null) {
    +    throw new IllegalStateException(getClass.getName + " has not been initialized")
    +  }
    +
    +  private def onBlockMessageReceive(msg: Message, id: ConnectionManagerId): Option[Message] = {
    --- End diff --
    
    note to reviewer: everything after this line is copied from old BlockManagerWorker code


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3019] Pluggable block transfer interfac...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2240#issuecomment-54233613
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19608/consoleFull) for   PR 2240 at commit [`07ccf0d`](https://github.com/apache/spark/commit/07ccf0db4c250c0160e42cf6efb030e7502d30e7).
     * This patch **fails** unit tests.
     * This patch **does not** merge cleanly!



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3019] Pluggable block transfer interfac...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2240#issuecomment-54238092
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19613/consoleFull) for   PR 2240 at commit [`2a907e4`](https://github.com/apache/spark/commit/2a907e4bb00d579ae9b1f4724ba2a58200a9596d).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3019] Pluggable block transfer interfac...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/2240#issuecomment-54900165
  
    Ok I talked to @pwendell and @shivaram offline. I'm going to merge this for now since the pull request is growing larger. Feel free to continue commenting on the pull request, and I will address these comments as separte pull requests.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3019] Pluggable block transfer interfac...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2240#issuecomment-54259106
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19641/consoleFull) for   PR 2240 at commit [`2960c93`](https://github.com/apache/spark/commit/2960c93a85db42bb6a018e28c44cde2aed73f3d6).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3019] Pluggable block transfer interfac...

Posted by colorant <gi...@git.apache.org>.
Github user colorant commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2240#discussion_r17032661
  
    --- Diff: core/src/main/scala/org/apache/spark/SparkEnv.scala ---
    @@ -223,14 +226,14 @@ object SparkEnv extends Logging {
     
         val shuffleMemoryManager = new ShuffleMemoryManager(conf)
     
    +    val blockTransferService = new CMBlockTransferService(conf, securityManager)
    +
    --- End diff --
    
    Then, how did you plan to handle this securityManager parameter? move it into BlockTransferService interface ( though some implementation might not need it) to  make it easy for instantiate works? or some how tweak connectionManager to make it use SecurityManager in lazy way so that it can retrive it from SparkEnv?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3019] Pluggable block transfer interfac...

Posted by shivaram <gi...@git.apache.org>.
Github user shivaram commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2240#discussion_r17029964
  
    --- Diff: core/src/main/scala/org/apache/spark/network/cm/CMBlockTransferService.scala ---
    @@ -0,0 +1,206 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network.cm
    +
    +import java.nio.ByteBuffer
    +
    +import scala.concurrent.Await
    +import scala.concurrent.duration.Duration
    +
    +import org.apache.spark.{SparkException, Logging, SecurityManager, SparkConf}
    +import org.apache.spark.network._
    +import org.apache.spark.storage.{BlockId, StorageLevel}
    +import org.apache.spark.util.Utils
    +
    +
    +/**
    + * A [[BlockTransferService]] implementation based on our [[ConnectionManager]].
    + */
    +final class CMBlockTransferService(conf: SparkConf, securityManager: SecurityManager)
    +  extends BlockTransferService with Logging {
    +
    +  private var cm: ConnectionManager = _
    +
    +  private var blockDataManager: BlockDataManager = _
    +
    +  /**
    +   * Port number the service is listening on, available only after [[init]] is invoked.
    +   */
    +  override def port: Int = {
    +    checkInit()
    +    cm.id.port
    +  }
    +
    +  /**
    +   * Host name the service is listening on, available only after [[init]] is invoked.
    +   */
    +  override def hostName: String = {
    +    checkInit()
    +    cm.id.host
    +  }
    +
    +  /**
    +   * Initialize the transfer service by giving it the BlockDataManager that can be used to fetch
    +   * local blocks or put local blocks.
    +   */
    +  override def init(blockDataManager: BlockDataManager): Unit = {
    +    this.blockDataManager = blockDataManager
    +    cm = new ConnectionManager(
    +      conf.getInt("spark.blockManager.port", 0),
    +      conf,
    +      securityManager,
    +      "Connection manager for block manager")
    +    cm.onReceiveMessage(onBlockMessageReceive)
    +  }
    +
    +  /**
    +   * Tear down the transfer service.
    +   */
    +  override def stop(): Unit = {
    +    if (cm != null) {
    +      cm.stop()
    +    }
    +  }
    +
    +  override def fetchBlocks(
    +      hostName: String,
    +      port: Int,
    +      blockIds: Seq[String],
    +      listener: BlockFetchingListener): Unit = {
    +    checkInit()
    +
    +    val cmId = new ConnectionManagerId(hostName, port)
    +    val blockMessageArray = new BlockMessageArray(blockIds.map { blockId =>
    +      BlockMessage.fromGetBlock(GetBlock(BlockId(blockId)))
    +    })
    +
    +    val future = cm.sendMessageReliably(cmId, blockMessageArray.toBufferMessage)
    +
    +    // Register the listener on success/failure future callback.
    +    future.onSuccess { case message =>
    +      val bufferMessage = message.asInstanceOf[BufferMessage]
    +      val blockMessageArray = BlockMessageArray.fromBufferMessage(bufferMessage)
    +
    +      for (blockMessage <- blockMessageArray) {
    +        if (blockMessage.getType != BlockMessage.TYPE_GOT_BLOCK) {
    +          listener.onBlockFetchFailure(
    +            new SparkException(s"Unexpected message ${blockMessage.getType} received from $cmId"))
    +        } else {
    +          val blockId = blockMessage.getId
    +          val networkSize = blockMessage.getData.limit()
    +          listener.onBlockFetchSuccess(
    +            blockId.toString, new NioByteBufferManagedBuffer(blockMessage.getData))
    +        }
    +      }
    +    }(cm.futureExecContext)
    +
    +    future.onFailure { case exception =>
    +      listener.onBlockFetchFailure(exception)
    +    }(cm.futureExecContext)
    +  }
    +
    +  /**
    +   * Upload a single block to a remote node, available only after [[init]] is invoked.
    +   *
    +   * This call blocks until the upload completes, or throws an exception upon failures.
    +   */
    +  override def uploadBlock(
    +      hostname: String,
    +      port: Int,
    +      blockId: String,
    +      blockData: ManagedBuffer,
    +      level: StorageLevel) {
    +    checkInit()
    +    val msg = PutBlock(BlockId(blockId), blockData.byteBuffer(), level)
    +    val blockMessageArray = new BlockMessageArray(BlockMessage.fromPutBlock(msg))
    +    val remoteCmId = new ConnectionManagerId(hostName, port)
    +
    +    // TODO: Not wait infinitely.
    --- End diff --
    
    this would go away if we had a listener interface here ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3019] Pluggable block transfer interfac...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2240#issuecomment-54227712
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19608/consoleFull) for   PR 2240 at commit [`07ccf0d`](https://github.com/apache/spark/commit/07ccf0db4c250c0160e42cf6efb030e7502d30e7).
     * This patch **does not** merge cleanly!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3019] Pluggable block transfer interfac...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/2240#issuecomment-54255463
  
    I talked to @shivaram offline. It is not really going to simplify review if I break this into two PRs because the renaming part is very small and has only about ~50 loc changes.
    
    The main thing to review is the high level interface (which we can change later also since this is an internal interface), and the sendRequest function in ShuffleBlockFetcherIterator.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3019] Pluggable block transfer interfac...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2240#discussion_r17032727
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala ---
    @@ -0,0 +1,270 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.storage
    +
    +import java.util.concurrent.LinkedBlockingQueue
    +
    +import scala.collection.mutable.ArrayBuffer
    +import scala.collection.mutable.HashSet
    +import scala.collection.mutable.Queue
    +
    +import org.apache.spark.{TaskContext, Logging, SparkException}
    +import org.apache.spark.network.{ManagedBuffer, BlockFetchingListener, BlockTransferService}
    +import org.apache.spark.serializer.Serializer
    +import org.apache.spark.util.Utils
    +
    +
    +/**
    + * An iterator that fetches multiple blocks. For local blocks, it fetches from the local block
    + * manager. For remote blocks, it fetches them using the provided BlockTransferService.
    + *
    + * This creates an iterator of (BlockID, values) tuples so the caller can handle blocks in a
    + * pipelined fashion as they are received.
    + *
    + * The implementation throttles the remote fetches to they don't exceed maxBytesInFlight to avoid
    + * using too much memory.
    + *
    + * @param context [[TaskContext]], used for metrics update
    + * @param blockTransferService [[BlockTransferService]] for fetching remote blocks
    + * @param blockManager  [[BlockManager]] for reading local blocks
    + * @param blocksByAddress list of blocks to fetch grouped by the [[BlockManagerId]].
    + *                        For each block we also require the size (in bytes as a long field) in
    + *                        order to throttle the memory usage.
    + * @param serializer serializer used to deserialize the data.
    + * @param maxBytesInFlight max size (in bytes) of remote blocks to fetch at any given point.
    + */
    +private[spark]
    +final class ShuffleBlockFetcherIterator(
    +    context: TaskContext,
    +    blockTransferService: BlockTransferService,
    +    blockManager: BlockManager,
    +    blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])],
    +    serializer: Serializer,
    +    maxBytesInFlight: Long)
    +  extends Iterator[(BlockId, Option[Iterator[Any]])] with Logging {
    +
    +  import ShuffleBlockFetcherIterator._
    +
    +  /**
    +   * Total number of blocks to fetch. This can be smaller than the total number of blocks
    +   * in [[blocksByAddress]] because we filter out zero-sized blocks in [[initialize]].
    +   *
    +   * This should equal localBlocks.size + remoteBlocks.size.
    +   */
    +  private[this] var numBlocksToFetch = 0
    +
    +  /**
    +   * The number of blocks proccessed by the caller. The iterator is exhausted when
    +   * [[numBlocksProcessed]] == [[numBlocksToFetch]].
    +   */
    +  private[this] var numBlocksProcessed = 0
    +
    +  private[this] val startTime = System.currentTimeMillis
    +
    +  /** Local blocks to fetch, excluding zero-sized blocks. */
    +  private[this] val localBlocks = new ArrayBuffer[BlockId]()
    +
    +  /** Remote blocks to fetch, excluding zero-sized blocks. */
    +  private[this] val remoteBlocks = new HashSet[BlockId]()
    +
    +  /**
    +   * A queue to hold our results. This turns the asynchronous model provided by
    +   * [[BlockTransferService]] into a synchronous model (iterator).
    +   */
    +  private[this] val results = new LinkedBlockingQueue[FetchResult]
    +
    +  // Queue of fetch requests to issue; we'll pull requests off this gradually to make sure that
    +  // the number of bytes in flight is limited to maxBytesInFlight
    +  private[this] val fetchRequests = new Queue[FetchRequest]
    +
    +  // Current bytes in flight from our requests
    +  private[this] var bytesInFlight = 0L
    +
    +  private[this] val shuffleMetrics = context.taskMetrics.createShuffleReadMetricsForDependency()
    +
    +  initialize()
    +
    +  private[this] def sendRequest(req: FetchRequest) {
    +    logDebug("Sending request for %d blocks (%s) from %s".format(
    +      req.blocks.size, Utils.bytesToString(req.size), req.address.hostPort))
    +    bytesInFlight += req.size
    +
    +    // so we can look up the size of each blockID
    +    val sizeMap = req.blocks.map { case (blockId, size) => (blockId.toString, size) }.toMap
    +    val blockIds = req.blocks.map(_._1.toString)
    +
    +    blockTransferService.fetchBlocks(req.address.host, req.address.port, blockIds,
    +      new BlockFetchingListener {
    +        override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = {
    +          results.put(new FetchResult(BlockId(blockId), sizeMap(blockId),
    +            () => blockManager.dataDeserialize(BlockId(blockId), data.nioByteBuffer(), serializer)
    +          ))
    +          shuffleMetrics.remoteBytesRead += data.size
    +          shuffleMetrics.remoteBlocksFetched += 1
    +          logDebug("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime))
    +        }
    +
    +        override def onBlockFetchFailure(e: Throwable): Unit = {
    +          logError("Failed to get block(s) from ${req.address.host}:${req.address.port}", e)
    +          // Note that there is a chance that some blocks have been fetched successfully, but we
    +          // still add them to the failed queue. This is fine because when the caller see a
    +          // FetchFailedException, it is going to fail the entire task anyway.
    +          for ((blockId, size) <- req.blocks) {
    +            results.put(new FetchResult(blockId, -1, null))
    +          }
    +        }
    +      }
    +    )
    +  }
    +
    +  private[this] def splitLocalRemoteBlocks(): ArrayBuffer[FetchRequest] = {
    --- End diff --
    
    everything after this line in this file is copied from the old code. no need to review


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3019] Pluggable block transfer interfac...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2240#issuecomment-54255610
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19639/consoleFull) for   PR 2240 at commit [`e29c721`](https://github.com/apache/spark/commit/e29c721132fcee79d65d4b6e30dd4ee46a814ef7).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3019] Pluggable block transfer interfac...

Posted by colorant <gi...@git.apache.org>.
Github user colorant commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2240#discussion_r17028088
  
    --- Diff: core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala ---
    @@ -0,0 +1,101 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network
    +
    +import java.io.{RandomAccessFile, File, FileInputStream, InputStream}
    +import java.nio.ByteBuffer
    +import java.nio.channels.FileChannel.MapMode
    +
    +import io.netty.buffer.{ByteBufInputStream, ByteBuf, Unpooled}
    +import io.netty.channel.DefaultFileRegion
    +
    +import org.apache.spark.storage.FileSegment
    +import org.apache.spark.util.ByteBufferInputStream
    +
    +
    +/**
    + * Provides a buffer abstraction that allows pooling and reuse.
    + */
    +abstract class ManagedBuffer {
    +  // Note that all the methods are defined with parenthesis because their implementations can
    +  // have side effects (io operations).
    +
    +  def byteBuffer(): ByteBuffer
    +
    +  def fileSegment(): Option[FileSegment] = None
    +
    +  def inputStream(): InputStream = throw new UnsupportedOperationException
    +
    +  def release(): Unit = throw new UnsupportedOperationException
    +
    +  def size: Long
    +
    +  private[network] def toNetty(): AnyRef
    +}
    +
    +
    +/**
    + * A ManagedBuffer backed by a segment in a file.
    + */
    +final class FileSegmentManagedBuffer(file: File, offset: Long, length: Long)
    +  extends ManagedBuffer {
    +
    +  override def size: Long = length
    +
    +  override def byteBuffer(): ByteBuffer = {
    +    val channel = new RandomAccessFile(file, "r").getChannel
    --- End diff --
    
    Add the minMemoryMapBytes judgement here for directly read small files?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3019] Pluggable block transfer interfac...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2240#issuecomment-54892840
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19994/consoleFull) for   PR 2240 at commit [`64cd9d7`](https://github.com/apache/spark/commit/64cd9d7f258530b3252e8cd4aaa7fd04f6752278).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3019] Pluggable block transfer interfac...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2240#issuecomment-54884747
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19994/consoleFull) for   PR 2240 at commit [`64cd9d7`](https://github.com/apache/spark/commit/64cd9d7f258530b3252e8cd4aaa7fd04f6752278).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3019] Pluggable block transfer interfac...

Posted by shivaram <gi...@git.apache.org>.
Github user shivaram commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2240#discussion_r17029861
  
    --- Diff: core/src/main/scala/org/apache/spark/network/BlockTransferService.scala ---
    @@ -0,0 +1,114 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network
    +
    +import org.apache.spark.storage.StorageLevel
    +
    +
    +abstract class BlockTransferService {
    +
    +  /**
    +   * Initialize the transfer service by giving it the BlockDataManager that can be used to fetch
    +   * local blocks or put local blocks.
    +   */
    +  def init(blockDataManager: BlockDataManager)
    +
    +  /**
    +   * Tear down the transfer service.
    +   */
    +  def stop(): Unit
    +
    +  /**
    +   * Port number the service is listening on, available only after [[init]] is invoked.
    +   */
    +  def port: Int
    +
    +  /**
    +   * Host name the service is listening on, available only after [[init]] is invoked.
    +   */
    +  def hostName: String
    +
    +  /**
    +   * Fetch a sequence of blocks from a remote node asynchronously,
    +   * available only after [[init]] is invoked.
    +   *
    +   * Note that [[BlockFetchingListener.onBlockFetchSuccess]] is called once per block,
    +   * while [[BlockFetchingListener.onBlockFetchSuccess]] is called once per failure.
    +   *
    +   * This takes a sequence so the implementation can batch requests.
    +   */
    +  def fetchBlocks(
    +      hostName: String,
    +      port: Int,
    +      blockIds: Seq[String],
    +      listener: BlockFetchingListener): Unit
    +
    +  /**
    +   * Fetch a single block from a remote node, synchronously,
    +   * available only after [[init]] is invoked.
    +   */
    +  def fetchBlock(hostName: String, port: Int, blockId: String): ManagedBuffer = {
    +    // TODO(rxin): Add timeout?
    +
    +    // A monitor for the thread to wait on.
    +    val lock = new Object
    +    @volatile var result: Either[ManagedBuffer, Throwable] = null
    +    fetchBlocks(hostName, port, Seq(blockId), new BlockFetchingListener {
    +      override def onBlockFetchFailure(exception: Throwable): Unit = {
    +        lock.synchronized {
    +          result = Right(exception)
    +          lock.notify()
    +        }
    +      }
    +      override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = {
    +        lock.synchronized {
    +          result = Left(data)
    +          lock.notify()
    +        }
    +      }
    +    })
    +
    +    // Sleep until result is no longer null
    +    lock.synchronized {
    +      while (result == null) {
    +        try {
    +          lock.wait()
    +        } catch {
    +          case e: InterruptedException =>
    +        }
    +      }
    +    }
    +
    +    result match {
    +      case Left(data) => data
    +      case Right(e) => throw e
    +    }
    +  }
    +
    +  /**
    +   * Upload a single block to a remote node, available only after [[init]] is invoked.
    +   *
    +   * This call blocks until the upload completes, or throws an exception upon failures.
    +   */
    +  def uploadBlock(
    --- End diff --
    
    Why don't we have a non-blocking version of this ? (and make the blocking version a special case)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3019] Pluggable block transfer interfac...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2240#issuecomment-54264464
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19644/consoleFull) for   PR 2240 at commit [`1332156`](https://github.com/apache/spark/commit/13321569b169b173444e5c8a4aab2975ebc3244d).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3019] Pluggable block transfer interfac...

Posted by shivaram <gi...@git.apache.org>.
Github user shivaram commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2240#discussion_r17029848
  
    --- Diff: core/src/main/scala/org/apache/spark/network/BlockTransferService.scala ---
    @@ -0,0 +1,114 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network
    +
    +import org.apache.spark.storage.StorageLevel
    +
    +
    +abstract class BlockTransferService {
    +
    +  /**
    +   * Initialize the transfer service by giving it the BlockDataManager that can be used to fetch
    +   * local blocks or put local blocks.
    +   */
    +  def init(blockDataManager: BlockDataManager)
    +
    +  /**
    +   * Tear down the transfer service.
    +   */
    +  def stop(): Unit
    +
    +  /**
    +   * Port number the service is listening on, available only after [[init]] is invoked.
    +   */
    +  def port: Int
    +
    +  /**
    +   * Host name the service is listening on, available only after [[init]] is invoked.
    +   */
    +  def hostName: String
    +
    +  /**
    +   * Fetch a sequence of blocks from a remote node asynchronously,
    +   * available only after [[init]] is invoked.
    +   *
    +   * Note that [[BlockFetchingListener.onBlockFetchSuccess]] is called once per block,
    +   * while [[BlockFetchingListener.onBlockFetchSuccess]] is called once per failure.
    --- End diff --
    
    you mean onBlockFetchFailure in the second line ? And could you reword to `it is called exactly once for every failed block` ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3019] Pluggable block transfer interfac...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2240#discussion_r17032815
  
    --- Diff: core/src/main/scala/org/apache/spark/network/BlockFetchingListener.scala ---
    @@ -0,0 +1,37 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.network
    +
    +import java.util.EventListener
    +
    +
    --- End diff --
    
    mind removing this message so it doesn't show up as unaddressed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org