You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/02/28 06:53:09 UTC

git commit: Remote BlockFetchTracker trait

Repository: spark
Updated Branches:
  refs/heads/master 40e080a68 -> edf8a56ab


Remote BlockFetchTracker trait

This trait seems to have been created a while ago when there
were multiple implementations; now that there's just one, I think it
makes sense to merge it into the BlockFetcherIterator trait.

Author: Kay Ousterhout <ka...@gmail.com>

Closes #39 from kayousterhout/remove_tracker and squashes the following commits:

8173939 [Kay Ousterhout] Remote BlockFetchTracker.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/edf8a56a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/edf8a56a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/edf8a56a

Branch: refs/heads/master
Commit: edf8a56ab7eaee1f7c3b4579eb10464984d31d7a
Parents: 40e080a
Author: Kay Ousterhout <ka...@gmail.com>
Authored: Thu Feb 27 21:52:55 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Thu Feb 27 21:52:55 2014 -0800

----------------------------------------------------------------------
 .../spark/storage/BlockFetchTracker.scala       | 27 -------------------
 .../spark/storage/BlockFetcherIterator.scala    | 28 ++++++++++++--------
 2 files changed, 17 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/edf8a56a/core/src/main/scala/org/apache/spark/storage/BlockFetchTracker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetchTracker.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetchTracker.scala
deleted file mode 100644
index 2e0b0e6..0000000
--- a/core/src/main/scala/org/apache/spark/storage/BlockFetchTracker.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.storage
-
-private[spark] trait BlockFetchTracker {
-  def totalBlocks : Int
-  def numLocalBlocks: Int
-  def numRemoteBlocks: Int
-  def remoteFetchTime : Long
-  def fetchWaitTime: Long
-  def remoteBytesRead : Long
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/edf8a56a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
index 925022e..fb50b45 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
@@ -44,9 +44,14 @@ import org.apache.spark.util.Utils
  */
 
 private[storage]
-trait BlockFetcherIterator extends Iterator[(BlockId, Option[Iterator[Any]])]
-  with Logging with BlockFetchTracker {
+trait BlockFetcherIterator extends Iterator[(BlockId, Option[Iterator[Any]])] with Logging {
   def initialize()
+  def totalBlocks: Int
+  def numLocalBlocks: Int
+  def numRemoteBlocks: Int
+  def remoteFetchTime: Long
+  def fetchWaitTime: Long
+  def remoteBytesRead: Long
 }
 
 
@@ -233,7 +238,16 @@ object BlockFetcherIterator {
       logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime) + " ms")
     }
 
-    //an iterator that will read fetched blocks off the queue as they arrive.
+    override def totalBlocks: Int = numLocal + numRemote
+    override def numLocalBlocks: Int = numLocal
+    override def numRemoteBlocks: Int = numRemote
+    override def remoteFetchTime: Long = _remoteFetchTime
+    override def fetchWaitTime: Long = _fetchWaitTime
+    override def remoteBytesRead: Long = _remoteBytesRead
+ 
+
+    // Implementing the Iterator methods with an iterator that reads fetched blocks off the queue
+    // as they arrive.
     @volatile protected var resultsGotten = 0
 
     override def hasNext: Boolean = resultsGotten < _numBlocksToFetch
@@ -251,14 +265,6 @@ object BlockFetcherIterator {
       }
       (result.blockId, if (result.failed) None else Some(result.deserialize()))
     }
-
-    // Implementing BlockFetchTracker trait.
-    override def totalBlocks: Int = numLocal + numRemote
-    override def numLocalBlocks: Int = numLocal
-    override def numRemoteBlocks: Int = numRemote
-    override def remoteFetchTime: Long = _remoteFetchTime
-    override def fetchWaitTime: Long = _fetchWaitTime
-    override def remoteBytesRead: Long = _remoteBytesRead
   }
   // End of BasicBlockFetcherIterator