You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/07/10 13:07:06 UTC

spark git commit: [SPARK-21342] Fix DownloadCallback to work well with RetryingBlockFetcher.

Repository: spark
Updated Branches:
  refs/heads/master 647963a26 -> 6a06c4b03


[SPARK-21342] Fix DownloadCallback to work well with RetryingBlockFetcher.

## What changes were proposed in this pull request?

When `RetryingBlockFetcher` retries fetching blocks. There could be two `DownloadCallback`s download the same content to the same target file. It could cause `ShuffleBlockFetcherIterator` reading a partial result.

This pr proposes to create and delete the tmp files in `OneForOneBlockFetcher`

Author: jinxing <ji...@126.com>
Author: Shixiong Zhu <zs...@gmail.com>

Closes #18565 from jinxing64/SPARK-21342.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6a06c4b0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6a06c4b0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6a06c4b0

Branch: refs/heads/master
Commit: 6a06c4b03c4dd86241fb9d11b4360371488f0e53
Parents: 647963a
Author: jinxing <ji...@126.com>
Authored: Mon Jul 10 21:06:58 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Mon Jul 10 21:06:58 2017 +0800

----------------------------------------------------------------------
 .../network/shuffle/ExternalShuffleClient.java  |  7 ++--
 .../network/shuffle/OneForOneBlockFetcher.java  | 34 +++++++++++-------
 .../spark/network/shuffle/ShuffleClient.java    | 13 +++++--
 .../network/shuffle/TempShuffleFileManager.java | 36 ++++++++++++++++++++
 .../network/sasl/SaslIntegrationSuite.java      |  2 +-
 .../shuffle/OneForOneBlockFetcherSuite.java     |  2 +-
 .../spark/network/BlockTransferService.scala    |  8 ++---
 .../netty/NettyBlockTransferService.scala       |  9 +++--
 .../storage/ShuffleBlockFetcherIterator.scala   | 28 ++++++++++-----
 .../spark/storage/BlockManagerSuite.scala       |  4 +--
 .../ShuffleBlockFetcherIteratorSuite.scala      | 10 +++---
 11 files changed, 108 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6a06c4b0/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
index 6ac9302..31bd24e 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
@@ -17,7 +17,6 @@
 
 package org.apache.spark.network.shuffle;
 
-import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
@@ -91,15 +90,15 @@ public class ExternalShuffleClient extends ShuffleClient {
       String execId,
       String[] blockIds,
       BlockFetchingListener listener,
-      File[] shuffleFiles) {
+      TempShuffleFileManager tempShuffleFileManager) {
     checkInit();
     logger.debug("External shuffle fetch from {}:{} (executor id {})", host, port, execId);
     try {
       RetryingBlockFetcher.BlockFetchStarter blockFetchStarter =
           (blockIds1, listener1) -> {
             TransportClient client = clientFactory.createClient(host, port);
-            new OneForOneBlockFetcher(client, appId, execId, blockIds1, listener1, conf,
-              shuffleFiles).start();
+            new OneForOneBlockFetcher(client, appId, execId,
+              blockIds1, listener1, conf, tempShuffleFileManager).start();
           };
 
       int maxRetries = conf.maxIORetries();

http://git-wip-us.apache.org/repos/asf/spark/blob/6a06c4b0/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
index d46ce2e..2f160d1 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
@@ -57,30 +57,36 @@ public class OneForOneBlockFetcher {
   private final String[] blockIds;
   private final BlockFetchingListener listener;
   private final ChunkReceivedCallback chunkCallback;
-  private TransportConf transportConf = null;
-  private File[] shuffleFiles = null;
+  private final TransportConf transportConf;
+  private final TempShuffleFileManager tempShuffleFileManager;
 
   private StreamHandle streamHandle = null;
 
   public OneForOneBlockFetcher(
+    TransportClient client,
+    String appId,
+    String execId,
+    String[] blockIds,
+    BlockFetchingListener listener,
+    TransportConf transportConf) {
+    this(client, appId, execId, blockIds, listener, transportConf, null);
+  }
+
+  public OneForOneBlockFetcher(
       TransportClient client,
       String appId,
       String execId,
       String[] blockIds,
       BlockFetchingListener listener,
       TransportConf transportConf,
-      File[] shuffleFiles) {
+      TempShuffleFileManager tempShuffleFileManager) {
     this.client = client;
     this.openMessage = new OpenBlocks(appId, execId, blockIds);
     this.blockIds = blockIds;
     this.listener = listener;
     this.chunkCallback = new ChunkCallback();
     this.transportConf = transportConf;
-    if (shuffleFiles != null) {
-      this.shuffleFiles = shuffleFiles;
-      assert this.shuffleFiles.length == blockIds.length:
-        "Number of shuffle files should equal to blocks";
-    }
+    this.tempShuffleFileManager = tempShuffleFileManager;
   }
 
   /** Callback invoked on receipt of each chunk. We equate a single chunk to a single block. */
@@ -119,9 +125,9 @@ public class OneForOneBlockFetcher {
           // Immediately request all chunks -- we expect that the total size of the request is
           // reasonable due to higher level chunking in [[ShuffleBlockFetcherIterator]].
           for (int i = 0; i < streamHandle.numChunks; i++) {
-            if (shuffleFiles != null) {
+            if (tempShuffleFileManager != null) {
               client.stream(OneForOneStreamManager.genStreamChunkId(streamHandle.streamId, i),
-                new DownloadCallback(shuffleFiles[i], i));
+                new DownloadCallback(i));
             } else {
               client.fetchChunk(streamHandle.streamId, i, chunkCallback);
             }
@@ -157,8 +163,8 @@ public class OneForOneBlockFetcher {
     private File targetFile = null;
     private int chunkIndex;
 
-    DownloadCallback(File targetFile, int chunkIndex) throws IOException {
-      this.targetFile = targetFile;
+    DownloadCallback(int chunkIndex) throws IOException {
+      this.targetFile = tempShuffleFileManager.createTempShuffleFile();
       this.channel = Channels.newChannel(new FileOutputStream(targetFile));
       this.chunkIndex = chunkIndex;
     }
@@ -174,6 +180,9 @@ public class OneForOneBlockFetcher {
       ManagedBuffer buffer = new FileSegmentManagedBuffer(transportConf, targetFile, 0,
         targetFile.length());
       listener.onBlockFetchSuccess(blockIds[chunkIndex], buffer);
+      if (!tempShuffleFileManager.registerTempShuffleFileToClean(targetFile)) {
+        targetFile.delete();
+      }
     }
 
     @Override
@@ -182,6 +191,7 @@ public class OneForOneBlockFetcher {
       // On receipt of a failure, fail every block from chunkIndex onwards.
       String[] remainingBlockIds = Arrays.copyOfRange(blockIds, chunkIndex, blockIds.length);
       failRemainingBlocks(remainingBlockIds, cause);
+      targetFile.delete();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6a06c4b0/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java
index 978ff5a..9e77bee 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java
@@ -18,7 +18,6 @@
 package org.apache.spark.network.shuffle;
 
 import java.io.Closeable;
-import java.io.File;
 
 /** Provides an interface for reading shuffle files, either from an Executor or external service. */
 public abstract class ShuffleClient implements Closeable {
@@ -35,6 +34,16 @@ public abstract class ShuffleClient implements Closeable {
    * Note that this API takes a sequence so the implementation can batch requests, and does not
    * return a future so the underlying implementation can invoke onBlockFetchSuccess as soon as
    * the data of a block is fetched, rather than waiting for all blocks to be fetched.
+   *
+   * @param host the host of the remote node.
+   * @param port the port of the remote node.
+   * @param execId the executor id.
+   * @param blockIds block ids to fetch.
+   * @param listener the listener to receive block fetching status.
+   * @param tempShuffleFileManager TempShuffleFileManager to create and clean temp shuffle files.
+   *                               If it's not <code>null</code>, the remote blocks will be streamed
+   *                               into temp shuffle files to reduce the memory usage, otherwise,
+   *                               they will be kept in memory.
    */
   public abstract void fetchBlocks(
       String host,
@@ -42,5 +51,5 @@ public abstract class ShuffleClient implements Closeable {
       String execId,
       String[] blockIds,
       BlockFetchingListener listener,
-      File[] shuffleFiles);
+      TempShuffleFileManager tempShuffleFileManager);
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/6a06c4b0/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/TempShuffleFileManager.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/TempShuffleFileManager.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/TempShuffleFileManager.java
new file mode 100644
index 0000000..84a5ed6
--- /dev/null
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/TempShuffleFileManager.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.io.File;
+
+/**
+ * A manager to create temp shuffle block files to reduce the memory usage and also clean temp
+ * files when they won't be used any more.
+ */
+public interface TempShuffleFileManager {
+
+  /** Create a temp shuffle block file. */
+  File createTempShuffleFile();
+
+  /**
+   * Register a temp shuffle file to clean up when it won't be used any more. Return whether the
+   * file is registered successfully. If `false`, the caller should clean up the file by itself.
+   */
+  boolean registerTempShuffleFileToClean(File file);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6a06c4b0/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
index 8110f1e..02e6eb3 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
@@ -204,7 +204,7 @@ public class SaslIntegrationSuite {
 
       String[] blockIds = { "shuffle_0_1_2", "shuffle_0_3_4" };
       OneForOneBlockFetcher fetcher =
-          new OneForOneBlockFetcher(client1, "app-2", "0", blockIds, listener, conf, null);
+          new OneForOneBlockFetcher(client1, "app-2", "0", blockIds, listener, conf);
       fetcher.start();
       blockFetchLatch.await();
       checkSecurityException(exception.get());

http://git-wip-us.apache.org/repos/asf/spark/blob/6a06c4b0/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockFetcherSuite.java
----------------------------------------------------------------------
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockFetcherSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockFetcherSuite.java
index 61d8221..dc947a6 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockFetcherSuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockFetcherSuite.java
@@ -131,7 +131,7 @@ public class OneForOneBlockFetcherSuite {
     BlockFetchingListener listener = mock(BlockFetchingListener.class);
     String[] blockIds = blocks.keySet().toArray(new String[blocks.size()]);
     OneForOneBlockFetcher fetcher =
-      new OneForOneBlockFetcher(client, "app-id", "exec-id", blockIds, listener, conf, null);
+      new OneForOneBlockFetcher(client, "app-id", "exec-id", blockIds, listener, conf);
 
     // Respond to the "OpenBlocks" message with an appropriate ShuffleStreamHandle with streamId 123
     doAnswer(invocationOnMock -> {

http://git-wip-us.apache.org/repos/asf/spark/blob/6a06c4b0/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
index 6860214..fe5fd2d 100644
--- a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.network
 
-import java.io.{Closeable, File}
+import java.io.Closeable
 import java.nio.ByteBuffer
 
 import scala.concurrent.{Future, Promise}
@@ -26,7 +26,7 @@ import scala.reflect.ClassTag
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
-import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient}
+import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient, TempShuffleFileManager}
 import org.apache.spark.storage.{BlockId, StorageLevel}
 import org.apache.spark.util.ThreadUtils
 
@@ -68,7 +68,7 @@ abstract class BlockTransferService extends ShuffleClient with Closeable with Lo
       execId: String,
       blockIds: Array[String],
       listener: BlockFetchingListener,
-      shuffleFiles: Array[File]): Unit
+      tempShuffleFileManager: TempShuffleFileManager): Unit
 
   /**
    * Upload a single block to a remote node, available only after [[init]] is invoked.
@@ -101,7 +101,7 @@ abstract class BlockTransferService extends ShuffleClient with Closeable with Lo
           ret.flip()
           result.success(new NioManagedBuffer(ret))
         }
-      }, shuffleFiles = null)
+      }, tempShuffleFileManager = null)
     ThreadUtils.awaitResult(result.future, Duration.Inf)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6a06c4b0/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
index b13a9c6..30ff938 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.network.netty
 
-import java.io.File
 import java.nio.ByteBuffer
 
 import scala.collection.JavaConverters._
@@ -30,7 +29,7 @@ import org.apache.spark.network.buffer.ManagedBuffer
 import org.apache.spark.network.client.{RpcResponseCallback, TransportClientBootstrap, TransportClientFactory}
 import org.apache.spark.network.crypto.{AuthClientBootstrap, AuthServerBootstrap}
 import org.apache.spark.network.server._
-import org.apache.spark.network.shuffle.{BlockFetchingListener, OneForOneBlockFetcher, RetryingBlockFetcher}
+import org.apache.spark.network.shuffle.{BlockFetchingListener, OneForOneBlockFetcher, RetryingBlockFetcher, TempShuffleFileManager}
 import org.apache.spark.network.shuffle.protocol.UploadBlock
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.serializer.JavaSerializer
@@ -90,14 +89,14 @@ private[spark] class NettyBlockTransferService(
       execId: String,
       blockIds: Array[String],
       listener: BlockFetchingListener,
-      shuffleFiles: Array[File]): Unit = {
+      tempShuffleFileManager: TempShuffleFileManager): Unit = {
     logTrace(s"Fetch blocks from $host:$port (executor id $execId)")
     try {
       val blockFetchStarter = new RetryingBlockFetcher.BlockFetchStarter {
         override def createAndStart(blockIds: Array[String], listener: BlockFetchingListener) {
           val client = clientFactory.createClient(host, port)
-          new OneForOneBlockFetcher(client, appId, execId, blockIds.toArray, listener,
-            transportConf, shuffleFiles).start()
+          new OneForOneBlockFetcher(client, appId, execId, blockIds, listener,
+            transportConf, tempShuffleFileManager).start()
         }
       }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6a06c4b0/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index a10f1fe..81d822d 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -28,7 +28,7 @@ import scala.collection.mutable.{ArrayBuffer, HashSet, Queue}
 import org.apache.spark.{SparkException, TaskContext}
 import org.apache.spark.internal.Logging
 import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
-import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient}
+import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient, TempShuffleFileManager}
 import org.apache.spark.shuffle.FetchFailedException
 import org.apache.spark.util.Utils
 import org.apache.spark.util.io.ChunkedByteBufferOutputStream
@@ -66,7 +66,7 @@ final class ShuffleBlockFetcherIterator(
     maxReqsInFlight: Int,
     maxReqSizeShuffleToMem: Long,
     detectCorrupt: Boolean)
-  extends Iterator[(BlockId, InputStream)] with Logging {
+  extends Iterator[(BlockId, InputStream)] with TempShuffleFileManager with Logging {
 
   import ShuffleBlockFetcherIterator._
 
@@ -135,7 +135,8 @@ final class ShuffleBlockFetcherIterator(
    * A set to store the files used for shuffling remote huge blocks. Files in this set will be
    * deleted when cleanup. This is a layer of defensiveness against disk file leaks.
    */
-  val shuffleFilesSet = mutable.HashSet[File]()
+  @GuardedBy("this")
+  private[this] val shuffleFilesSet = mutable.HashSet[File]()
 
   initialize()
 
@@ -149,6 +150,19 @@ final class ShuffleBlockFetcherIterator(
     currentResult = null
   }
 
+  override def createTempShuffleFile(): File = {
+    blockManager.diskBlockManager.createTempLocalBlock()._2
+  }
+
+  override def registerTempShuffleFileToClean(file: File): Boolean = synchronized {
+    if (isZombie) {
+      false
+    } else {
+      shuffleFilesSet += file
+      true
+    }
+  }
+
   /**
    * Mark the iterator as zombie, and release all buffers that haven't been deserialized yet.
    */
@@ -176,7 +190,7 @@ final class ShuffleBlockFetcherIterator(
     }
     shuffleFilesSet.foreach { file =>
       if (!file.delete()) {
-        logInfo("Failed to cleanup shuffle fetch temp file " + file.getAbsolutePath());
+        logWarning("Failed to cleanup shuffle fetch temp file " + file.getAbsolutePath())
       }
     }
   }
@@ -221,12 +235,8 @@ final class ShuffleBlockFetcherIterator(
     // already encrypted and compressed over the wire(w.r.t. the related configs), we can just fetch
     // the data and write it to file directly.
     if (req.size > maxReqSizeShuffleToMem) {
-      val shuffleFiles = blockIds.map { _ =>
-        blockManager.diskBlockManager.createTempLocalBlock()._2
-      }.toArray
-      shuffleFilesSet ++= shuffleFiles
       shuffleClient.fetchBlocks(address.host, address.port, address.executorId, blockIds.toArray,
-        blockFetchingListener, shuffleFiles)
+        blockFetchingListener, this)
     } else {
       shuffleClient.fetchBlocks(address.host, address.port, address.executorId, blockIds.toArray,
         blockFetchingListener, null)

http://git-wip-us.apache.org/repos/asf/spark/blob/6a06c4b0/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 086adcc..755a61a 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -45,7 +45,7 @@ import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
 import org.apache.spark.network.client.{RpcResponseCallback, TransportClient}
 import org.apache.spark.network.netty.{NettyBlockTransferService, SparkTransportConf}
 import org.apache.spark.network.server.{NoOpRpcHandler, TransportServer, TransportServerBootstrap}
-import org.apache.spark.network.shuffle.BlockFetchingListener
+import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient, TempShuffleFileManager}
 import org.apache.spark.network.shuffle.protocol.{BlockTransferMessage, RegisterExecutor}
 import org.apache.spark.rpc.RpcEnv
 import org.apache.spark.scheduler.LiveListenerBus
@@ -1382,7 +1382,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
         execId: String,
         blockIds: Array[String],
         listener: BlockFetchingListener,
-        shuffleFiles: Array[File]): Unit = {
+        tempShuffleFileManager: TempShuffleFileManager): Unit = {
       listener.onBlockFetchSuccess("mockBlockId", new NioManagedBuffer(ByteBuffer.allocate(1)))
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6a06c4b0/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
index 559b3fa..6a70ced 100644
--- a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
@@ -33,7 +33,7 @@ import org.scalatest.PrivateMethodTester
 import org.apache.spark.{SparkFunSuite, TaskContext}
 import org.apache.spark.network._
 import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
-import org.apache.spark.network.shuffle.BlockFetchingListener
+import org.apache.spark.network.shuffle.{BlockFetchingListener, TempShuffleFileManager}
 import org.apache.spark.network.util.LimitedInputStream
 import org.apache.spark.shuffle.FetchFailedException
 import org.apache.spark.util.Utils
@@ -432,12 +432,12 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
     val remoteBlocks = Map[BlockId, ManagedBuffer](
       ShuffleBlockId(0, 0, 0) -> createMockManagedBuffer())
     val transfer = mock(classOf[BlockTransferService])
-    var shuffleFiles: Array[File] = null
+    var tempShuffleFileManager: TempShuffleFileManager = null
     when(transfer.fetchBlocks(any(), any(), any(), any(), any(), any()))
       .thenAnswer(new Answer[Unit] {
         override def answer(invocation: InvocationOnMock): Unit = {
           val listener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
-          shuffleFiles = invocation.getArguments()(5).asInstanceOf[Array[File]]
+          tempShuffleFileManager = invocation.getArguments()(5).asInstanceOf[TempShuffleFileManager]
           Future {
             listener.onBlockFetchSuccess(
               ShuffleBlockId(0, 0, 0).toString, remoteBlocks(ShuffleBlockId(0, 0, 0)))
@@ -466,13 +466,13 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
     fetchShuffleBlock(blocksByAddress1)
     // `maxReqSizeShuffleToMem` is 200, which is greater than the block size 100, so don't fetch
     // shuffle block to disk.
-    assert(shuffleFiles === null)
+    assert(tempShuffleFileManager == null)
 
     val blocksByAddress2 = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
       (remoteBmId, remoteBlocks.keys.map(blockId => (blockId, 300L)).toSeq))
     fetchShuffleBlock(blocksByAddress2)
     // `maxReqSizeShuffleToMem` is 200, which is smaller than the block size 300, so fetch
     // shuffle block to disk.
-    assert(shuffleFiles != null)
+    assert(tempShuffleFileManager != null)
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org