You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/03/04 06:27:28 UTC

git commit: Remove broken/unused Connection.getChunkFIFO method.

Repository: spark
Updated Branches:
  refs/heads/master f5ae38af8 -> b14ede789


Remove broken/unused Connection.getChunkFIFO method.

This method appears to be broken -- since it never removes
anything from messages, and it adds new messages to it,
the while loop is an infinite loop.  The method also does not appear
to have ever been used since the code was added in 2012, so
this commit removes it.

cc @mateiz who originally added this method in case there's a reason it should be here! (https://github.com/apache/spark/commit/63051dd2bcc4bf09d413ff7cf89a37967edc33ba)

Author: Kay Ousterhout <ka...@gmail.com>

Closes #69 from kayousterhout/remove_get_fifo and squashes the following commits:

053bc59 [Kay Ousterhout] Remove broken/unused Connection.getChunkFIFO method.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b14ede78
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b14ede78
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b14ede78

Branch: refs/heads/master
Commit: b14ede789abfabe25144385e8dc2fb96691aba81
Parents: f5ae38a
Author: Kay Ousterhout <ka...@gmail.com>
Authored: Mon Mar 3 21:27:18 2014 -0800
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Mon Mar 3 21:27:18 2014 -0800

----------------------------------------------------------------------
 .../org/apache/spark/network/Connection.scala   | 36 ++------------------
 1 file changed, 2 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b14ede78/core/src/main/scala/org/apache/spark/network/Connection.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/Connection.scala b/core/src/main/scala/org/apache/spark/network/Connection.scala
index f2e3c1a..8219a18 100644
--- a/core/src/main/scala/org/apache/spark/network/Connection.scala
+++ b/core/src/main/scala/org/apache/spark/network/Connection.scala
@@ -171,7 +171,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
     remoteId_ : ConnectionManagerId)
   extends Connection(SocketChannel.open, selector_, remoteId_) {
 
-  private class Outbox(fair: Int = 0) {
+  private class Outbox {
     val messages = new Queue[Message]()
     val defaultChunkSize = 65536  //32768 //16384
     var nextMessageToBeUsed = 0
@@ -186,38 +186,6 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
     }
 
     def getChunk(): Option[MessageChunk] = {
-      fair match {
-        case 0 => getChunkFIFO()
-        case 1 => getChunkRR()
-        case _ => throw new Exception("Unexpected fairness policy in outbox")
-      }
-    }
-
-    private def getChunkFIFO(): Option[MessageChunk] = {
-      /*logInfo("Using FIFO")*/
-      messages.synchronized {
-        while (!messages.isEmpty) {
-          val message = messages(0)
-          val chunk = message.getChunkForSending(defaultChunkSize)
-          if (chunk.isDefined) {
-            messages += message  // this is probably incorrect, it wont work as fifo
-            if (!message.started) {
-              logDebug("Starting to send [" + message + "]")
-              message.started = true
-              message.startTime = System.currentTimeMillis
-            }
-            return chunk
-          } else {
-            message.finishTime = System.currentTimeMillis
-            logDebug("Finished sending [" + message + "] to [" + getRemoteConnectionManagerId() +
-              "] in "  + message.timeTaken )
-          }
-        }
-      }
-      None
-    }
-
-    private def getChunkRR(): Option[MessageChunk] = {
       messages.synchronized {
         while (!messages.isEmpty) {
           /*nextMessageToBeUsed = nextMessageToBeUsed % messages.size */
@@ -249,7 +217,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
 
   // outbox is used as a lock - ensure that it is always used as a leaf (since methods which 
   // lock it are invoked in context of other locks)
-  private val outbox = new Outbox(1)
+  private val outbox = new Outbox()
   /*
     This is orthogonal to whether we have pending bytes to write or not - and satisfies a slightly 
     different purpose. This flag is to see if we need to force reregister for write even when we