You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/09/01 23:59:05 UTC
[21/69] [abbrv] [partial] Initial work to rename package to
org.apache.spark
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/network/BufferMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/network/BufferMessage.scala b/core/src/main/scala/spark/network/BufferMessage.scala
deleted file mode 100644
index e566aea..0000000
--- a/core/src/main/scala/spark/network/BufferMessage.scala
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.network
-
-import java.nio.ByteBuffer
-
-import scala.collection.mutable.ArrayBuffer
-
-import spark.storage.BlockManager
-
-
-private[spark]
-class BufferMessage(id_ : Int, val buffers: ArrayBuffer[ByteBuffer], var ackId: Int)
- extends Message(Message.BUFFER_MESSAGE, id_) {
-
- val initialSize = currentSize()
- var gotChunkForSendingOnce = false
-
- def size = initialSize
-
- def currentSize() = {
- if (buffers == null || buffers.isEmpty) {
- 0
- } else {
- buffers.map(_.remaining).reduceLeft(_ + _)
- }
- }
-
- def getChunkForSending(maxChunkSize: Int): Option[MessageChunk] = {
- if (maxChunkSize <= 0) {
- throw new Exception("Max chunk size is " + maxChunkSize)
- }
-
- if (size == 0 && gotChunkForSendingOnce == false) {
- val newChunk = new MessageChunk(
- new MessageChunkHeader(typ, id, 0, 0, ackId, senderAddress), null)
- gotChunkForSendingOnce = true
- return Some(newChunk)
- }
-
- while(!buffers.isEmpty) {
- val buffer = buffers(0)
- if (buffer.remaining == 0) {
- BlockManager.dispose(buffer)
- buffers -= buffer
- } else {
- val newBuffer = if (buffer.remaining <= maxChunkSize) {
- buffer.duplicate()
- } else {
- buffer.slice().limit(maxChunkSize).asInstanceOf[ByteBuffer]
- }
- buffer.position(buffer.position + newBuffer.remaining)
- val newChunk = new MessageChunk(new MessageChunkHeader(
- typ, id, size, newBuffer.remaining, ackId, senderAddress), newBuffer)
- gotChunkForSendingOnce = true
- return Some(newChunk)
- }
- }
- None
- }
-
- def getChunkForReceiving(chunkSize: Int): Option[MessageChunk] = {
- // STRONG ASSUMPTION: BufferMessage created when receiving data has ONLY ONE data buffer
- if (buffers.size > 1) {
- throw new Exception("Attempting to get chunk from message with multiple data buffers")
- }
- val buffer = buffers(0)
- if (buffer.remaining > 0) {
- if (buffer.remaining < chunkSize) {
- throw new Exception("Not enough space in data buffer for receiving chunk")
- }
- val newBuffer = buffer.slice().limit(chunkSize).asInstanceOf[ByteBuffer]
- buffer.position(buffer.position + newBuffer.remaining)
- val newChunk = new MessageChunk(new MessageChunkHeader(
- typ, id, size, newBuffer.remaining, ackId, senderAddress), newBuffer)
- return Some(newChunk)
- }
- None
- }
-
- def flip() {
- buffers.foreach(_.flip)
- }
-
- def hasAckId() = (ackId != 0)
-
- def isCompletelyReceived() = !buffers(0).hasRemaining
-
- override def toString = {
- if (hasAckId) {
- "BufferAckMessage(aid = " + ackId + ", id = " + id + ", size = " + size + ")"
- } else {
- "BufferMessage(id = " + id + ", size = " + size + ")"
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/network/Connection.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/network/Connection.scala b/core/src/main/scala/spark/network/Connection.scala
deleted file mode 100644
index 1e571d3..0000000
--- a/core/src/main/scala/spark/network/Connection.scala
+++ /dev/null
@@ -1,586 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.network
-
-import spark._
-
-import scala.collection.mutable.{HashMap, Queue, ArrayBuffer}
-
-import java.io._
-import java.nio._
-import java.nio.channels._
-import java.nio.channels.spi._
-import java.net._
-
-
-private[spark]
-abstract class Connection(val channel: SocketChannel, val selector: Selector,
- val socketRemoteConnectionManagerId: ConnectionManagerId)
- extends Logging {
-
- def this(channel_ : SocketChannel, selector_ : Selector) = {
- this(channel_, selector_,
- ConnectionManagerId.fromSocketAddress(
- channel_.socket.getRemoteSocketAddress().asInstanceOf[InetSocketAddress]))
- }
-
- channel.configureBlocking(false)
- channel.socket.setTcpNoDelay(true)
- channel.socket.setReuseAddress(true)
- channel.socket.setKeepAlive(true)
- /*channel.socket.setReceiveBufferSize(32768) */
-
- @volatile private var closed = false
- var onCloseCallback: Connection => Unit = null
- var onExceptionCallback: (Connection, Exception) => Unit = null
- var onKeyInterestChangeCallback: (Connection, Int) => Unit = null
-
- val remoteAddress = getRemoteAddress()
-
- def resetForceReregister(): Boolean
-
- // Read channels typically do not register for write and write does not for read
- // Now, we do have write registering for read too (temporarily), but this is to detect
- // channel close NOT to actually read/consume data on it !
- // How does this work if/when we move to SSL ?
-
- // What is the interest to register with selector for when we want this connection to be selected
- def registerInterest()
-
- // What is the interest to register with selector for when we want this connection to
- // be de-selected
- // Traditionally, 0 - but in our case, for example, for close-detection on SendingConnection hack,
- // it will be SelectionKey.OP_READ (until we fix it properly)
- def unregisterInterest()
-
- // On receiving a read event, should we change the interest for this channel or not ?
- // Will be true for ReceivingConnection, false for SendingConnection.
- def changeInterestForRead(): Boolean
-
- // On receiving a write event, should we change the interest for this channel or not ?
- // Will be false for ReceivingConnection, true for SendingConnection.
- // Actually, for now, should not get triggered for ReceivingConnection
- def changeInterestForWrite(): Boolean
-
- def getRemoteConnectionManagerId(): ConnectionManagerId = {
- socketRemoteConnectionManagerId
- }
-
- def key() = channel.keyFor(selector)
-
- def getRemoteAddress() = channel.socket.getRemoteSocketAddress().asInstanceOf[InetSocketAddress]
-
- // Returns whether we have to register for further reads or not.
- def read(): Boolean = {
- throw new UnsupportedOperationException(
- "Cannot read on connection of type " + this.getClass.toString)
- }
-
- // Returns whether we have to register for further writes or not.
- def write(): Boolean = {
- throw new UnsupportedOperationException(
- "Cannot write on connection of type " + this.getClass.toString)
- }
-
- def close() {
- closed = true
- val k = key()
- if (k != null) {
- k.cancel()
- }
- channel.close()
- callOnCloseCallback()
- }
-
- protected def isClosed: Boolean = closed
-
- def onClose(callback: Connection => Unit) {
- onCloseCallback = callback
- }
-
- def onException(callback: (Connection, Exception) => Unit) {
- onExceptionCallback = callback
- }
-
- def onKeyInterestChange(callback: (Connection, Int) => Unit) {
- onKeyInterestChangeCallback = callback
- }
-
- def callOnExceptionCallback(e: Exception) {
- if (onExceptionCallback != null) {
- onExceptionCallback(this, e)
- } else {
- logError("Error in connection to " + getRemoteConnectionManagerId() +
- " and OnExceptionCallback not registered", e)
- }
- }
-
- def callOnCloseCallback() {
- if (onCloseCallback != null) {
- onCloseCallback(this)
- } else {
- logWarning("Connection to " + getRemoteConnectionManagerId() +
- " closed and OnExceptionCallback not registered")
- }
-
- }
-
- def changeConnectionKeyInterest(ops: Int) {
- if (onKeyInterestChangeCallback != null) {
- onKeyInterestChangeCallback(this, ops)
- } else {
- throw new Exception("OnKeyInterestChangeCallback not registered")
- }
- }
-
- def printRemainingBuffer(buffer: ByteBuffer) {
- val bytes = new Array[Byte](buffer.remaining)
- val curPosition = buffer.position
- buffer.get(bytes)
- bytes.foreach(x => print(x + " "))
- buffer.position(curPosition)
- print(" (" + bytes.size + ")")
- }
-
- def printBuffer(buffer: ByteBuffer, position: Int, length: Int) {
- val bytes = new Array[Byte](length)
- val curPosition = buffer.position
- buffer.position(position)
- buffer.get(bytes)
- bytes.foreach(x => print(x + " "))
- print(" (" + position + ", " + length + ")")
- buffer.position(curPosition)
- }
-}
-
-
-private[spark]
-class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
- remoteId_ : ConnectionManagerId)
- extends Connection(SocketChannel.open, selector_, remoteId_) {
-
- private class Outbox(fair: Int = 0) {
- val messages = new Queue[Message]()
- val defaultChunkSize = 65536 //32768 //16384
- var nextMessageToBeUsed = 0
-
- def addMessage(message: Message) {
- messages.synchronized{
- /*messages += message*/
- messages.enqueue(message)
- logDebug("Added [" + message + "] to outbox for sending to " +
- "[" + getRemoteConnectionManagerId() + "]")
- }
- }
-
- def getChunk(): Option[MessageChunk] = {
- fair match {
- case 0 => getChunkFIFO()
- case 1 => getChunkRR()
- case _ => throw new Exception("Unexpected fairness policy in outbox")
- }
- }
-
- private def getChunkFIFO(): Option[MessageChunk] = {
- /*logInfo("Using FIFO")*/
- messages.synchronized {
- while (!messages.isEmpty) {
- val message = messages(0)
- val chunk = message.getChunkForSending(defaultChunkSize)
- if (chunk.isDefined) {
- messages += message // this is probably incorrect, it wont work as fifo
- if (!message.started) {
- logDebug("Starting to send [" + message + "]")
- message.started = true
- message.startTime = System.currentTimeMillis
- }
- return chunk
- } else {
- /*logInfo("Finished sending [" + message + "] to [" + getRemoteConnectionManagerId() + "]")*/
- message.finishTime = System.currentTimeMillis
- logDebug("Finished sending [" + message + "] to [" + getRemoteConnectionManagerId() +
- "] in " + message.timeTaken )
- }
- }
- }
- None
- }
-
- private def getChunkRR(): Option[MessageChunk] = {
- messages.synchronized {
- while (!messages.isEmpty) {
- /*nextMessageToBeUsed = nextMessageToBeUsed % messages.size */
- /*val message = messages(nextMessageToBeUsed)*/
- val message = messages.dequeue
- val chunk = message.getChunkForSending(defaultChunkSize)
- if (chunk.isDefined) {
- messages.enqueue(message)
- nextMessageToBeUsed = nextMessageToBeUsed + 1
- if (!message.started) {
- logDebug(
- "Starting to send [" + message + "] to [" + getRemoteConnectionManagerId() + "]")
- message.started = true
- message.startTime = System.currentTimeMillis
- }
- logTrace(
- "Sending chunk from [" + message+ "] to [" + getRemoteConnectionManagerId() + "]")
- return chunk
- } else {
- message.finishTime = System.currentTimeMillis
- logDebug("Finished sending [" + message + "] to [" + getRemoteConnectionManagerId() +
- "] in " + message.timeTaken )
- }
- }
- }
- None
- }
- }
-
- // outbox is used as a lock - ensure that it is always used as a leaf (since methods which
- // lock it are invoked in context of other locks)
- private val outbox = new Outbox(1)
- /*
- This is orthogonal to whether we have pending bytes to write or not - and satisfies a slightly
- different purpose. This flag is to see if we need to force reregister for write even when we
- do not have any pending bytes to write to socket.
- This can happen due to a race between adding pending buffers, and checking for existing of
- data as detailed in https://github.com/mesos/spark/pull/791
- */
- private var needForceReregister = false
- val currentBuffers = new ArrayBuffer[ByteBuffer]()
-
- /*channel.socket.setSendBufferSize(256 * 1024)*/
-
- override def getRemoteAddress() = address
-
- val DEFAULT_INTEREST = SelectionKey.OP_READ
-
- override def registerInterest() {
- // Registering read too - does not really help in most cases, but for some
- // it does - so let us keep it for now.
- changeConnectionKeyInterest(SelectionKey.OP_WRITE | DEFAULT_INTEREST)
- }
-
- override def unregisterInterest() {
- changeConnectionKeyInterest(DEFAULT_INTEREST)
- }
-
- def send(message: Message) {
- outbox.synchronized {
- outbox.addMessage(message)
- needForceReregister = true
- }
- if (channel.isConnected) {
- registerInterest()
- }
- }
-
- // return previous value after resetting it.
- def resetForceReregister(): Boolean = {
- outbox.synchronized {
- val result = needForceReregister
- needForceReregister = false
- result
- }
- }
-
- // MUST be called within the selector loop
- def connect() {
- try{
- channel.register(selector, SelectionKey.OP_CONNECT)
- channel.connect(address)
- logInfo("Initiating connection to [" + address + "]")
- } catch {
- case e: Exception => {
- logError("Error connecting to " + address, e)
- callOnExceptionCallback(e)
- }
- }
- }
-
- def finishConnect(force: Boolean): Boolean = {
- try {
- // Typically, this should finish immediately since it was triggered by a connect
- // selection - though need not necessarily always complete successfully.
- val connected = channel.finishConnect
- if (!force && !connected) {
- logInfo(
- "finish connect failed [" + address + "], " + outbox.messages.size + " messages pending")
- return false
- }
-
- // Fallback to previous behavior - assume finishConnect completed
- // This will happen only when finishConnect failed for some repeated number of times
- // (10 or so)
- // Is highly unlikely unless there was an unclean close of socket, etc
- registerInterest()
- logInfo("Connected to [" + address + "], " + outbox.messages.size + " messages pending")
- return true
- } catch {
- case e: Exception => {
- logWarning("Error finishing connection to " + address, e)
- callOnExceptionCallback(e)
- // ignore
- return true
- }
- }
- }
-
- override def write(): Boolean = {
- try {
- while (true) {
- if (currentBuffers.size == 0) {
- outbox.synchronized {
- outbox.getChunk() match {
- case Some(chunk) => {
- val buffers = chunk.buffers
- // If we have 'seen' pending messages, then reset flag - since we handle that as normal
- // registering of event (below)
- if (needForceReregister && buffers.exists(_.remaining() > 0)) resetForceReregister()
- currentBuffers ++= buffers
- }
- case None => {
- // changeConnectionKeyInterest(0)
- /*key.interestOps(0)*/
- return false
- }
- }
- }
- }
-
- if (currentBuffers.size > 0) {
- val buffer = currentBuffers(0)
- val remainingBytes = buffer.remaining
- val writtenBytes = channel.write(buffer)
- if (buffer.remaining == 0) {
- currentBuffers -= buffer
- }
- if (writtenBytes < remainingBytes) {
- // re-register for write.
- return true
- }
- }
- }
- } catch {
- case e: Exception => {
- logWarning("Error writing in connection to " + getRemoteConnectionManagerId(), e)
- callOnExceptionCallback(e)
- close()
- return false
- }
- }
- // should not happen - to keep scala compiler happy
- return true
- }
-
- // This is a hack to determine if remote socket was closed or not.
- // SendingConnection DOES NOT expect to receive any data - if it does, it is an error
- // For a bunch of cases, read will return -1 in case remote socket is closed : hence we
- // register for reads to determine that.
- override def read(): Boolean = {
- // We don't expect the other side to send anything; so, we just read to detect an error or EOF.
- try {
- val length = channel.read(ByteBuffer.allocate(1))
- if (length == -1) { // EOF
- close()
- } else if (length > 0) {
- logWarning(
- "Unexpected data read from SendingConnection to " + getRemoteConnectionManagerId())
- }
- } catch {
- case e: Exception =>
- logError("Exception while reading SendingConnection to " + getRemoteConnectionManagerId(), e)
- callOnExceptionCallback(e)
- close()
- }
-
- false
- }
-
- override def changeInterestForRead(): Boolean = false
-
- override def changeInterestForWrite(): Boolean = ! isClosed
-}
-
-
-// Must be created within selector loop - else deadlock
-private[spark] class ReceivingConnection(channel_ : SocketChannel, selector_ : Selector)
- extends Connection(channel_, selector_) {
-
- class Inbox() {
- val messages = new HashMap[Int, BufferMessage]()
-
- def getChunk(header: MessageChunkHeader): Option[MessageChunk] = {
-
- def createNewMessage: BufferMessage = {
- val newMessage = Message.create(header).asInstanceOf[BufferMessage]
- newMessage.started = true
- newMessage.startTime = System.currentTimeMillis
- logDebug(
- "Starting to receive [" + newMessage + "] from [" + getRemoteConnectionManagerId() + "]")
- messages += ((newMessage.id, newMessage))
- newMessage
- }
-
- val message = messages.getOrElseUpdate(header.id, createNewMessage)
- logTrace(
- "Receiving chunk of [" + message + "] from [" + getRemoteConnectionManagerId() + "]")
- message.getChunkForReceiving(header.chunkSize)
- }
-
- def getMessageForChunk(chunk: MessageChunk): Option[BufferMessage] = {
- messages.get(chunk.header.id)
- }
-
- def removeMessage(message: Message) {
- messages -= message.id
- }
- }
-
- @volatile private var inferredRemoteManagerId: ConnectionManagerId = null
-
- override def getRemoteConnectionManagerId(): ConnectionManagerId = {
- val currId = inferredRemoteManagerId
- if (currId != null) currId else super.getRemoteConnectionManagerId()
- }
-
- // The reciever's remote address is the local socket on remote side : which is NOT
- // the connection manager id of the receiver.
- // We infer that from the messages we receive on the receiver socket.
- private def processConnectionManagerId(header: MessageChunkHeader) {
- val currId = inferredRemoteManagerId
- if (header.address == null || currId != null) return
-
- val managerId = ConnectionManagerId.fromSocketAddress(header.address)
-
- if (managerId != null) {
- inferredRemoteManagerId = managerId
- }
- }
-
-
- val inbox = new Inbox()
- val headerBuffer: ByteBuffer = ByteBuffer.allocate(MessageChunkHeader.HEADER_SIZE)
- var onReceiveCallback: (Connection , Message) => Unit = null
- var currentChunk: MessageChunk = null
-
- channel.register(selector, SelectionKey.OP_READ)
-
- override def read(): Boolean = {
- try {
- while (true) {
- if (currentChunk == null) {
- val headerBytesRead = channel.read(headerBuffer)
- if (headerBytesRead == -1) {
- close()
- return false
- }
- if (headerBuffer.remaining > 0) {
- // re-register for read event ...
- return true
- }
- headerBuffer.flip
- if (headerBuffer.remaining != MessageChunkHeader.HEADER_SIZE) {
- throw new Exception(
- "Unexpected number of bytes (" + headerBuffer.remaining + ") in the header")
- }
- val header = MessageChunkHeader.create(headerBuffer)
- headerBuffer.clear()
-
- processConnectionManagerId(header)
-
- header.typ match {
- case Message.BUFFER_MESSAGE => {
- if (header.totalSize == 0) {
- if (onReceiveCallback != null) {
- onReceiveCallback(this, Message.create(header))
- }
- currentChunk = null
- // re-register for read event ...
- return true
- } else {
- currentChunk = inbox.getChunk(header).orNull
- }
- }
- case _ => throw new Exception("Message of unknown type received")
- }
- }
-
- if (currentChunk == null) throw new Exception("No message chunk to receive data")
-
- val bytesRead = channel.read(currentChunk.buffer)
- if (bytesRead == 0) {
- // re-register for read event ...
- return true
- } else if (bytesRead == -1) {
- close()
- return false
- }
-
- /*logDebug("Read " + bytesRead + " bytes for the buffer")*/
-
- if (currentChunk.buffer.remaining == 0) {
- /*println("Filled buffer at " + System.currentTimeMillis)*/
- val bufferMessage = inbox.getMessageForChunk(currentChunk).get
- if (bufferMessage.isCompletelyReceived) {
- bufferMessage.flip
- bufferMessage.finishTime = System.currentTimeMillis
- logDebug("Finished receiving [" + bufferMessage + "] from " +
- "[" + getRemoteConnectionManagerId() + "] in " + bufferMessage.timeTaken)
- if (onReceiveCallback != null) {
- onReceiveCallback(this, bufferMessage)
- }
- inbox.removeMessage(bufferMessage)
- }
- currentChunk = null
- }
- }
- } catch {
- case e: Exception => {
- logWarning("Error reading from connection to " + getRemoteConnectionManagerId(), e)
- callOnExceptionCallback(e)
- close()
- return false
- }
- }
- // should not happen - to keep scala compiler happy
- return true
- }
-
- def onReceive(callback: (Connection, Message) => Unit) {onReceiveCallback = callback}
-
- // override def changeInterestForRead(): Boolean = ! isClosed
- override def changeInterestForRead(): Boolean = true
-
- override def changeInterestForWrite(): Boolean = {
- throw new IllegalStateException("Unexpected invocation right now")
- }
-
- override def registerInterest() {
- // Registering read too - does not really help in most cases, but for some
- // it does - so let us keep it for now.
- changeConnectionKeyInterest(SelectionKey.OP_READ)
- }
-
- override def unregisterInterest() {
- changeConnectionKeyInterest(0)
- }
-
- // For read conn, always false.
- override def resetForceReregister(): Boolean = false
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/network/ConnectionManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/network/ConnectionManager.scala b/core/src/main/scala/spark/network/ConnectionManager.scala
deleted file mode 100644
index 8b9f3ae..0000000
--- a/core/src/main/scala/spark/network/ConnectionManager.scala
+++ /dev/null
@@ -1,720 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.network
-
-import spark._
-
-import java.nio._
-import java.nio.channels._
-import java.nio.channels.spi._
-import java.net._
-import java.util.concurrent.{LinkedBlockingDeque, TimeUnit, ThreadPoolExecutor}
-
-import scala.collection.mutable.HashSet
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.SynchronizedMap
-import scala.collection.mutable.SynchronizedQueue
-import scala.collection.mutable.ArrayBuffer
-
-import akka.dispatch.{Await, Promise, ExecutionContext, Future}
-import akka.util.Duration
-import akka.util.duration._
-
-
-private[spark] class ConnectionManager(port: Int) extends Logging {
-
- class MessageStatus(
- val message: Message,
- val connectionManagerId: ConnectionManagerId,
- completionHandler: MessageStatus => Unit) {
-
- var ackMessage: Option[Message] = None
- var attempted = false
- var acked = false
-
- def markDone() { completionHandler(this) }
- }
-
- private val selector = SelectorProvider.provider.openSelector()
-
- private val handleMessageExecutor = new ThreadPoolExecutor(
- System.getProperty("spark.core.connection.handler.threads.min","20").toInt,
- System.getProperty("spark.core.connection.handler.threads.max","60").toInt,
- System.getProperty("spark.core.connection.handler.threads.keepalive","60").toInt, TimeUnit.SECONDS,
- new LinkedBlockingDeque[Runnable]())
-
- private val handleReadWriteExecutor = new ThreadPoolExecutor(
- System.getProperty("spark.core.connection.io.threads.min","4").toInt,
- System.getProperty("spark.core.connection.io.threads.max","32").toInt,
- System.getProperty("spark.core.connection.io.threads.keepalive","60").toInt, TimeUnit.SECONDS,
- new LinkedBlockingDeque[Runnable]())
-
- // Use a different, yet smaller, thread pool - infrequently used with very short lived tasks : which should be executed asap
- private val handleConnectExecutor = new ThreadPoolExecutor(
- System.getProperty("spark.core.connection.connect.threads.min","1").toInt,
- System.getProperty("spark.core.connection.connect.threads.max","8").toInt,
- System.getProperty("spark.core.connection.connect.threads.keepalive","60").toInt, TimeUnit.SECONDS,
- new LinkedBlockingDeque[Runnable]())
-
- private val serverChannel = ServerSocketChannel.open()
- private val connectionsByKey = new HashMap[SelectionKey, Connection] with SynchronizedMap[SelectionKey, Connection]
- private val connectionsById = new HashMap[ConnectionManagerId, SendingConnection] with SynchronizedMap[ConnectionManagerId, SendingConnection]
- private val messageStatuses = new HashMap[Int, MessageStatus]
- private val keyInterestChangeRequests = new SynchronizedQueue[(SelectionKey, Int)]
- private val registerRequests = new SynchronizedQueue[SendingConnection]
-
- implicit val futureExecContext = ExecutionContext.fromExecutor(Utils.newDaemonCachedThreadPool())
-
- private var onReceiveCallback: (BufferMessage, ConnectionManagerId) => Option[Message]= null
-
- serverChannel.configureBlocking(false)
- serverChannel.socket.setReuseAddress(true)
- serverChannel.socket.setReceiveBufferSize(256 * 1024)
-
- serverChannel.socket.bind(new InetSocketAddress(port))
- serverChannel.register(selector, SelectionKey.OP_ACCEPT)
-
- val id = new ConnectionManagerId(Utils.localHostName, serverChannel.socket.getLocalPort)
- logInfo("Bound socket to port " + serverChannel.socket.getLocalPort() + " with id = " + id)
-
- private val selectorThread = new Thread("connection-manager-thread") {
- override def run() = ConnectionManager.this.run()
- }
- selectorThread.setDaemon(true)
- selectorThread.start()
-
- private val writeRunnableStarted: HashSet[SelectionKey] = new HashSet[SelectionKey]()
-
- private def triggerWrite(key: SelectionKey) {
- val conn = connectionsByKey.getOrElse(key, null)
- if (conn == null) return
-
- writeRunnableStarted.synchronized {
- // So that we do not trigger more write events while processing this one.
- // The write method will re-register when done.
- if (conn.changeInterestForWrite()) conn.unregisterInterest()
- if (writeRunnableStarted.contains(key)) {
- // key.interestOps(key.interestOps() & ~ SelectionKey.OP_WRITE)
- return
- }
-
- writeRunnableStarted += key
- }
- handleReadWriteExecutor.execute(new Runnable {
- override def run() {
- var register: Boolean = false
- try {
- register = conn.write()
- } finally {
- writeRunnableStarted.synchronized {
- writeRunnableStarted -= key
- val needReregister = register || conn.resetForceReregister()
- if (needReregister && conn.changeInterestForWrite()) {
- conn.registerInterest()
- }
- }
- }
- }
- } )
- }
-
- private val readRunnableStarted: HashSet[SelectionKey] = new HashSet[SelectionKey]()
-
- private def triggerRead(key: SelectionKey) {
- val conn = connectionsByKey.getOrElse(key, null)
- if (conn == null) return
-
- readRunnableStarted.synchronized {
- // So that we do not trigger more read events while processing this one.
- // The read method will re-register when done.
- if (conn.changeInterestForRead())conn.unregisterInterest()
- if (readRunnableStarted.contains(key)) {
- return
- }
-
- readRunnableStarted += key
- }
- handleReadWriteExecutor.execute(new Runnable {
- override def run() {
- var register: Boolean = false
- try {
- register = conn.read()
- } finally {
- readRunnableStarted.synchronized {
- readRunnableStarted -= key
- if (register && conn.changeInterestForRead()) {
- conn.registerInterest()
- }
- }
- }
- }
- } )
- }
-
- private def triggerConnect(key: SelectionKey) {
- val conn = connectionsByKey.getOrElse(key, null).asInstanceOf[SendingConnection]
- if (conn == null) return
-
- // prevent other events from being triggered
- // Since we are still trying to connect, we do not need to do the additional steps in triggerWrite
- conn.changeConnectionKeyInterest(0)
-
- handleConnectExecutor.execute(new Runnable {
- override def run() {
-
- var tries: Int = 10
- while (tries >= 0) {
- if (conn.finishConnect(false)) return
- // Sleep ?
- Thread.sleep(1)
- tries -= 1
- }
-
- // fallback to previous behavior : we should not really come here since this method was
- // triggered since channel became connectable : but at times, the first finishConnect need not
- // succeed : hence the loop to retry a few 'times'.
- conn.finishConnect(true)
- }
- } )
- }
-
- // MUST be called within selector loop - else deadlock.
- private def triggerForceCloseByException(key: SelectionKey, e: Exception) {
- try {
- key.interestOps(0)
- } catch {
- // ignore exceptions
- case e: Exception => logDebug("Ignoring exception", e)
- }
-
- val conn = connectionsByKey.getOrElse(key, null)
- if (conn == null) return
-
- // Pushing to connect threadpool
- handleConnectExecutor.execute(new Runnable {
- override def run() {
- try {
- conn.callOnExceptionCallback(e)
- } catch {
- // ignore exceptions
- case e: Exception => logDebug("Ignoring exception", e)
- }
- try {
- conn.close()
- } catch {
- // ignore exceptions
- case e: Exception => logDebug("Ignoring exception", e)
- }
- }
- })
- }
-
-
- def run() {
- try {
- while(!selectorThread.isInterrupted) {
- while (! registerRequests.isEmpty) {
- val conn: SendingConnection = registerRequests.dequeue
- addListeners(conn)
- conn.connect()
- addConnection(conn)
- }
-
- while(!keyInterestChangeRequests.isEmpty) {
- val (key, ops) = keyInterestChangeRequests.dequeue
-
- try {
- if (key.isValid) {
- val connection = connectionsByKey.getOrElse(key, null)
- if (connection != null) {
- val lastOps = key.interestOps()
- key.interestOps(ops)
-
- // hot loop - prevent materialization of string if trace not enabled.
- if (isTraceEnabled()) {
- def intToOpStr(op: Int): String = {
- val opStrs = ArrayBuffer[String]()
- if ((op & SelectionKey.OP_READ) != 0) opStrs += "READ"
- if ((op & SelectionKey.OP_WRITE) != 0) opStrs += "WRITE"
- if ((op & SelectionKey.OP_CONNECT) != 0) opStrs += "CONNECT"
- if ((op & SelectionKey.OP_ACCEPT) != 0) opStrs += "ACCEPT"
- if (opStrs.size > 0) opStrs.reduceLeft(_ + " | " + _) else " "
- }
-
- logTrace("Changed key for connection to [" + connection.getRemoteConnectionManagerId() +
- "] changed from [" + intToOpStr(lastOps) + "] to [" + intToOpStr(ops) + "]")
- }
- }
- } else {
- logInfo("Key not valid ? " + key)
- throw new CancelledKeyException()
- }
- } catch {
- case e: CancelledKeyException => {
- logInfo("key already cancelled ? " + key, e)
- triggerForceCloseByException(key, e)
- }
- case e: Exception => {
- logError("Exception processing key " + key, e)
- triggerForceCloseByException(key, e)
- }
- }
- }
-
- val selectedKeysCount =
- try {
- selector.select()
- } catch {
- // Explicitly only dealing with CancelledKeyException here since other exceptions should be dealt with differently.
- case e: CancelledKeyException => {
- // Some keys within the selectors list are invalid/closed. clear them.
- val allKeys = selector.keys().iterator()
-
- while (allKeys.hasNext()) {
- val key = allKeys.next()
- try {
- if (! key.isValid) {
- logInfo("Key not valid ? " + key)
- throw new CancelledKeyException()
- }
- } catch {
- case e: CancelledKeyException => {
- logInfo("key already cancelled ? " + key, e)
- triggerForceCloseByException(key, e)
- }
- case e: Exception => {
- logError("Exception processing key " + key, e)
- triggerForceCloseByException(key, e)
- }
- }
- }
- }
- 0
- }
-
- if (selectedKeysCount == 0) {
- logDebug("Selector selected " + selectedKeysCount + " of " + selector.keys.size + " keys")
- }
- if (selectorThread.isInterrupted) {
- logInfo("Selector thread was interrupted!")
- return
- }
-
- if (0 != selectedKeysCount) {
- val selectedKeys = selector.selectedKeys().iterator()
- while (selectedKeys.hasNext()) {
- val key = selectedKeys.next
- selectedKeys.remove()
- try {
- if (key.isValid) {
- if (key.isAcceptable) {
- acceptConnection(key)
- } else
- if (key.isConnectable) {
- triggerConnect(key)
- } else
- if (key.isReadable) {
- triggerRead(key)
- } else
- if (key.isWritable) {
- triggerWrite(key)
- }
- } else {
- logInfo("Key not valid ? " + key)
- throw new CancelledKeyException()
- }
- } catch {
- // weird, but we saw this happening - even though key.isValid was true, key.isAcceptable would throw CancelledKeyException.
- case e: CancelledKeyException => {
- logInfo("key already cancelled ? " + key, e)
- triggerForceCloseByException(key, e)
- }
- case e: Exception => {
- logError("Exception processing key " + key, e)
- triggerForceCloseByException(key, e)
- }
- }
- }
- }
- }
- } catch {
- case e: Exception => logError("Error in select loop", e)
- }
- }
-
- def acceptConnection(key: SelectionKey) {
- val serverChannel = key.channel.asInstanceOf[ServerSocketChannel]
-
- var newChannel = serverChannel.accept()
-
- // accept them all in a tight loop. non blocking accept with no processing, should be fine
- while (newChannel != null) {
- try {
- val newConnection = new ReceivingConnection(newChannel, selector)
- newConnection.onReceive(receiveMessage)
- addListeners(newConnection)
- addConnection(newConnection)
- logInfo("Accepted connection from [" + newConnection.remoteAddress.getAddress + "]")
- } catch {
- // might happen in case of issues with registering with selector
- case e: Exception => logError("Error in accept loop", e)
- }
-
- newChannel = serverChannel.accept()
- }
- }
-
- private def addListeners(connection: Connection) {
- connection.onKeyInterestChange(changeConnectionKeyInterest)
- connection.onException(handleConnectionError)
- connection.onClose(removeConnection)
- }
-
- def addConnection(connection: Connection) {
- connectionsByKey += ((connection.key, connection))
- }
-
- def removeConnection(connection: Connection) {
- connectionsByKey -= connection.key
-
- try {
- if (connection.isInstanceOf[SendingConnection]) {
- val sendingConnection = connection.asInstanceOf[SendingConnection]
- val sendingConnectionManagerId = sendingConnection.getRemoteConnectionManagerId()
- logInfo("Removing SendingConnection to " + sendingConnectionManagerId)
-
- connectionsById -= sendingConnectionManagerId
-
- messageStatuses.synchronized {
- messageStatuses
- .values.filter(_.connectionManagerId == sendingConnectionManagerId).foreach(status => {
- logInfo("Notifying " + status)
- status.synchronized {
- status.attempted = true
- status.acked = false
- status.markDone()
- }
- })
-
- messageStatuses.retain((i, status) => {
- status.connectionManagerId != sendingConnectionManagerId
- })
- }
- } else if (connection.isInstanceOf[ReceivingConnection]) {
- val receivingConnection = connection.asInstanceOf[ReceivingConnection]
- val remoteConnectionManagerId = receivingConnection.getRemoteConnectionManagerId()
- logInfo("Removing ReceivingConnection to " + remoteConnectionManagerId)
-
- val sendingConnectionOpt = connectionsById.get(remoteConnectionManagerId)
- if (! sendingConnectionOpt.isDefined) {
- logError("Corresponding SendingConnectionManagerId not found")
- return
- }
-
- val sendingConnection = sendingConnectionOpt.get
- connectionsById -= remoteConnectionManagerId
- sendingConnection.close()
-
- val sendingConnectionManagerId = sendingConnection.getRemoteConnectionManagerId()
-
- assert (sendingConnectionManagerId == remoteConnectionManagerId)
-
- messageStatuses.synchronized {
- for (s <- messageStatuses.values if s.connectionManagerId == sendingConnectionManagerId) {
- logInfo("Notifying " + s)
- s.synchronized {
- s.attempted = true
- s.acked = false
- s.markDone()
- }
- }
-
- messageStatuses.retain((i, status) => {
- status.connectionManagerId != sendingConnectionManagerId
- })
- }
- }
- } finally {
- // So that the selection keys can be removed.
- wakeupSelector()
- }
- }
-
- def handleConnectionError(connection: Connection, e: Exception) {
- logInfo("Handling connection error on connection to " + connection.getRemoteConnectionManagerId())
- removeConnection(connection)
- }
-
- def changeConnectionKeyInterest(connection: Connection, ops: Int) {
- keyInterestChangeRequests += ((connection.key, ops))
- // so that registerations happen !
- wakeupSelector()
- }
-
- def receiveMessage(connection: Connection, message: Message) {
- val connectionManagerId = ConnectionManagerId.fromSocketAddress(message.senderAddress)
- logDebug("Received [" + message + "] from [" + connectionManagerId + "]")
- val runnable = new Runnable() {
- val creationTime = System.currentTimeMillis
- def run() {
- logDebug("Handler thread delay is " + (System.currentTimeMillis - creationTime) + " ms")
- handleMessage(connectionManagerId, message)
- logDebug("Handling delay is " + (System.currentTimeMillis - creationTime) + " ms")
- }
- }
- handleMessageExecutor.execute(runnable)
- /*handleMessage(connection, message)*/
- }
-
- private def handleMessage(connectionManagerId: ConnectionManagerId, message: Message) {
- logDebug("Handling [" + message + "] from [" + connectionManagerId + "]")
- message match {
- case bufferMessage: BufferMessage => {
- if (bufferMessage.hasAckId) {
- val sentMessageStatus = messageStatuses.synchronized {
- messageStatuses.get(bufferMessage.ackId) match {
- case Some(status) => {
- messageStatuses -= bufferMessage.ackId
- status
- }
- case None => {
- throw new Exception("Could not find reference for received ack message " + message.id)
- null
- }
- }
- }
- sentMessageStatus.synchronized {
- sentMessageStatus.ackMessage = Some(message)
- sentMessageStatus.attempted = true
- sentMessageStatus.acked = true
- sentMessageStatus.markDone()
- }
- } else {
- val ackMessage = if (onReceiveCallback != null) {
- logDebug("Calling back")
- onReceiveCallback(bufferMessage, connectionManagerId)
- } else {
- logDebug("Not calling back as callback is null")
- None
- }
-
- if (ackMessage.isDefined) {
- if (!ackMessage.get.isInstanceOf[BufferMessage]) {
- logDebug("Response to " + bufferMessage + " is not a buffer message, it is of type " + ackMessage.get.getClass())
- } else if (!ackMessage.get.asInstanceOf[BufferMessage].hasAckId) {
- logDebug("Response to " + bufferMessage + " does not have ack id set")
- ackMessage.get.asInstanceOf[BufferMessage].ackId = bufferMessage.id
- }
- }
-
- sendMessage(connectionManagerId, ackMessage.getOrElse {
- Message.createBufferMessage(bufferMessage.id)
- })
- }
- }
- case _ => throw new Exception("Unknown type message received")
- }
- }
-
- private def sendMessage(connectionManagerId: ConnectionManagerId, message: Message) {
- def startNewConnection(): SendingConnection = {
- val inetSocketAddress = new InetSocketAddress(connectionManagerId.host, connectionManagerId.port)
- val newConnection = new SendingConnection(inetSocketAddress, selector, connectionManagerId)
- registerRequests.enqueue(newConnection)
-
- newConnection
- }
- // I removed the lookupKey stuff as part of merge ... should I re-add it ? We did not find it useful in our test-env ...
- // If we do re-add it, we should consistently use it everywhere I guess ?
- val connection = connectionsById.getOrElseUpdate(connectionManagerId, startNewConnection())
- message.senderAddress = id.toSocketAddress()
- logDebug("Sending [" + message + "] to [" + connectionManagerId + "]")
- connection.send(message)
-
- wakeupSelector()
- }
-
- private def wakeupSelector() {
- selector.wakeup()
- }
-
- def sendMessageReliably(connectionManagerId: ConnectionManagerId, message: Message)
- : Future[Option[Message]] = {
- val promise = Promise[Option[Message]]
- val status = new MessageStatus(message, connectionManagerId, s => promise.success(s.ackMessage))
- messageStatuses.synchronized {
- messageStatuses += ((message.id, status))
- }
- sendMessage(connectionManagerId, message)
- promise.future
- }
-
- def sendMessageReliablySync(connectionManagerId: ConnectionManagerId, message: Message): Option[Message] = {
- Await.result(sendMessageReliably(connectionManagerId, message), Duration.Inf)
- }
-
- def onReceiveMessage(callback: (Message, ConnectionManagerId) => Option[Message]) {
- onReceiveCallback = callback
- }
-
- def stop() {
- selectorThread.interrupt()
- selectorThread.join()
- selector.close()
- val connections = connectionsByKey.values
- connections.foreach(_.close())
- if (connectionsByKey.size != 0) {
- logWarning("All connections not cleaned up")
- }
- handleMessageExecutor.shutdown()
- handleReadWriteExecutor.shutdown()
- handleConnectExecutor.shutdown()
- logInfo("ConnectionManager stopped")
- }
-}
-
-
-private[spark] object ConnectionManager {
-
- def main(args: Array[String]) {
- val manager = new ConnectionManager(9999)
- manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
- println("Received [" + msg + "] from [" + id + "]")
- None
- })
-
- /*testSequentialSending(manager)*/
- /*System.gc()*/
-
- /*testParallelSending(manager)*/
- /*System.gc()*/
-
- /*testParallelDecreasingSending(manager)*/
- /*System.gc()*/
-
- testContinuousSending(manager)
- System.gc()
- }
-
- def testSequentialSending(manager: ConnectionManager) {
- println("--------------------------")
- println("Sequential Sending")
- println("--------------------------")
- val size = 10 * 1024 * 1024
- val count = 10
-
- val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => x.toByte))
- buffer.flip
-
- (0 until count).map(i => {
- val bufferMessage = Message.createBufferMessage(buffer.duplicate)
- manager.sendMessageReliablySync(manager.id, bufferMessage)
- })
- println("--------------------------")
- println()
- }
-
- def testParallelSending(manager: ConnectionManager) {
- println("--------------------------")
- println("Parallel Sending")
- println("--------------------------")
- val size = 10 * 1024 * 1024
- val count = 10
-
- val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => x.toByte))
- buffer.flip
-
- val startTime = System.currentTimeMillis
- (0 until count).map(i => {
- val bufferMessage = Message.createBufferMessage(buffer.duplicate)
- manager.sendMessageReliably(manager.id, bufferMessage)
- }).foreach(f => {
- val g = Await.result(f, 1 second)
- if (!g.isDefined) println("Failed")
- })
- val finishTime = System.currentTimeMillis
-
- val mb = size * count / 1024.0 / 1024.0
- val ms = finishTime - startTime
- val tput = mb * 1000.0 / ms
- println("--------------------------")
- println("Started at " + startTime + ", finished at " + finishTime)
- println("Sent " + count + " messages of size " + size + " in " + ms + " ms (" + tput + " MB/s)")
- println("--------------------------")
- println()
- }
-
- def testParallelDecreasingSending(manager: ConnectionManager) {
- println("--------------------------")
- println("Parallel Decreasing Sending")
- println("--------------------------")
- val size = 10 * 1024 * 1024
- val count = 10
- val buffers = Array.tabulate(count)(i => ByteBuffer.allocate(size * (i + 1)).put(Array.tabulate[Byte](size * (i + 1))(x => x.toByte)))
- buffers.foreach(_.flip)
- val mb = buffers.map(_.remaining).reduceLeft(_ + _) / 1024.0 / 1024.0
-
- val startTime = System.currentTimeMillis
- (0 until count).map(i => {
- val bufferMessage = Message.createBufferMessage(buffers(count - 1 - i).duplicate)
- manager.sendMessageReliably(manager.id, bufferMessage)
- }).foreach(f => {
- val g = Await.result(f, 1 second)
- if (!g.isDefined) println("Failed")
- })
- val finishTime = System.currentTimeMillis
-
- val ms = finishTime - startTime
- val tput = mb * 1000.0 / ms
- println("--------------------------")
- /*println("Started at " + startTime + ", finished at " + finishTime) */
- println("Sent " + mb + " MB in " + ms + " ms (" + tput + " MB/s)")
- println("--------------------------")
- println()
- }
-
- def testContinuousSending(manager: ConnectionManager) {
- println("--------------------------")
- println("Continuous Sending")
- println("--------------------------")
- val size = 10 * 1024 * 1024
- val count = 10
-
- val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => x.toByte))
- buffer.flip
-
- val startTime = System.currentTimeMillis
- while(true) {
- (0 until count).map(i => {
- val bufferMessage = Message.createBufferMessage(buffer.duplicate)
- manager.sendMessageReliably(manager.id, bufferMessage)
- }).foreach(f => {
- val g = Await.result(f, 1 second)
- if (!g.isDefined) println("Failed")
- })
- val finishTime = System.currentTimeMillis
- Thread.sleep(1000)
- val mb = size * count / 1024.0 / 1024.0
- val ms = finishTime - startTime
- val tput = mb * 1000.0 / ms
- println("Sent " + mb + " MB in " + ms + " ms (" + tput + " MB/s)")
- println("--------------------------")
- println()
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/network/ConnectionManagerId.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/network/ConnectionManagerId.scala b/core/src/main/scala/spark/network/ConnectionManagerId.scala
deleted file mode 100644
index 9d5c518..0000000
--- a/core/src/main/scala/spark/network/ConnectionManagerId.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.network
-
-import java.net.InetSocketAddress
-
-import spark.Utils
-
-
-private[spark] case class ConnectionManagerId(host: String, port: Int) {
- // DEBUG code
- Utils.checkHost(host)
- assert (port > 0)
-
- def toSocketAddress() = new InetSocketAddress(host, port)
-}
-
-
-private[spark] object ConnectionManagerId {
- def fromSocketAddress(socketAddress: InetSocketAddress): ConnectionManagerId = {
- new ConnectionManagerId(socketAddress.getHostName(), socketAddress.getPort())
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/network/ConnectionManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/network/ConnectionManagerTest.scala b/core/src/main/scala/spark/network/ConnectionManagerTest.scala
deleted file mode 100644
index 9e3827a..0000000
--- a/core/src/main/scala/spark/network/ConnectionManagerTest.scala
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.network
-
-import spark._
-import spark.SparkContext._
-
-import scala.io.Source
-
-import java.nio.ByteBuffer
-import java.net.InetAddress
-
-import akka.dispatch.Await
-import akka.util.duration._
-
-private[spark] object ConnectionManagerTest extends Logging{
- def main(args: Array[String]) {
- //<mesos cluster> - the master URL
- //<slaves file> - a list slaves to run connectionTest on
- //[num of tasks] - the number of parallel tasks to be initiated default is number of slave hosts
- //[size of msg in MB (integer)] - the size of messages to be sent in each task, default is 10
- //[count] - how many times to run, default is 3
- //[await time in seconds] : await time (in seconds), default is 600
- if (args.length < 2) {
- println("Usage: ConnectionManagerTest <mesos cluster> <slaves file> [num of tasks] [size of msg in MB (integer)] [count] [await time in seconds)] ")
- System.exit(1)
- }
-
- if (args(0).startsWith("local")) {
- println("This runs only on a mesos cluster")
- }
-
- val sc = new SparkContext(args(0), "ConnectionManagerTest")
- val slavesFile = Source.fromFile(args(1))
- val slaves = slavesFile.mkString.split("\n")
- slavesFile.close()
-
- /*println("Slaves")*/
- /*slaves.foreach(println)*/
- val tasknum = if (args.length > 2) args(2).toInt else slaves.length
- val size = ( if (args.length > 3) (args(3).toInt) else 10 ) * 1024 * 1024
- val count = if (args.length > 4) args(4).toInt else 3
- val awaitTime = (if (args.length > 5) args(5).toInt else 600 ).second
- println("Running "+count+" rounds of test: " + "parallel tasks = " + tasknum + ", msg size = " + size/1024/1024 + " MB, awaitTime = " + awaitTime)
- val slaveConnManagerIds = sc.parallelize(0 until tasknum, tasknum).map(
- i => SparkEnv.get.connectionManager.id).collect()
- println("\nSlave ConnectionManagerIds")
- slaveConnManagerIds.foreach(println)
- println
-
- (0 until count).foreach(i => {
- val resultStrs = sc.parallelize(0 until tasknum, tasknum).map(i => {
- val connManager = SparkEnv.get.connectionManager
- val thisConnManagerId = connManager.id
- connManager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
- logInfo("Received [" + msg + "] from [" + id + "]")
- None
- })
-
- val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => x.toByte))
- buffer.flip
-
- val startTime = System.currentTimeMillis
- val futures = slaveConnManagerIds.filter(_ != thisConnManagerId).map(slaveConnManagerId => {
- val bufferMessage = Message.createBufferMessage(buffer.duplicate)
- logInfo("Sending [" + bufferMessage + "] to [" + slaveConnManagerId + "]")
- connManager.sendMessageReliably(slaveConnManagerId, bufferMessage)
- })
- val results = futures.map(f => Await.result(f, awaitTime))
- val finishTime = System.currentTimeMillis
- Thread.sleep(5000)
-
- val mb = size * results.size / 1024.0 / 1024.0
- val ms = finishTime - startTime
- val resultStr = thisConnManagerId + " Sent " + mb + " MB in " + ms + " ms at " + (mb / ms * 1000.0) + " MB/s"
- logInfo(resultStr)
- resultStr
- }).collect()
-
- println("---------------------")
- println("Run " + i)
- resultStrs.foreach(println)
- println("---------------------")
- })
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/network/Message.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/network/Message.scala b/core/src/main/scala/spark/network/Message.scala
deleted file mode 100644
index a25457e..0000000
--- a/core/src/main/scala/spark/network/Message.scala
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.network
-
-import java.nio.ByteBuffer
-import java.net.InetSocketAddress
-
-import scala.collection.mutable.ArrayBuffer
-
-
-private[spark] abstract class Message(val typ: Long, val id: Int) {
- var senderAddress: InetSocketAddress = null
- var started = false
- var startTime = -1L
- var finishTime = -1L
-
- def size: Int
-
- def getChunkForSending(maxChunkSize: Int): Option[MessageChunk]
-
- def getChunkForReceiving(chunkSize: Int): Option[MessageChunk]
-
- def timeTaken(): String = (finishTime - startTime).toString + " ms"
-
- override def toString = this.getClass.getSimpleName + "(id = " + id + ", size = " + size + ")"
-}
-
-
-private[spark] object Message {
- val BUFFER_MESSAGE = 1111111111L
-
- var lastId = 1
-
- def getNewId() = synchronized {
- lastId += 1
- if (lastId == 0) {
- lastId += 1
- }
- lastId
- }
-
- def createBufferMessage(dataBuffers: Seq[ByteBuffer], ackId: Int): BufferMessage = {
- if (dataBuffers == null) {
- return new BufferMessage(getNewId(), new ArrayBuffer[ByteBuffer], ackId)
- }
- if (dataBuffers.exists(_ == null)) {
- throw new Exception("Attempting to create buffer message with null buffer")
- }
- return new BufferMessage(getNewId(), new ArrayBuffer[ByteBuffer] ++= dataBuffers, ackId)
- }
-
- def createBufferMessage(dataBuffers: Seq[ByteBuffer]): BufferMessage =
- createBufferMessage(dataBuffers, 0)
-
- def createBufferMessage(dataBuffer: ByteBuffer, ackId: Int): BufferMessage = {
- if (dataBuffer == null) {
- return createBufferMessage(Array(ByteBuffer.allocate(0)), ackId)
- } else {
- return createBufferMessage(Array(dataBuffer), ackId)
- }
- }
-
- def createBufferMessage(dataBuffer: ByteBuffer): BufferMessage =
- createBufferMessage(dataBuffer, 0)
-
- def createBufferMessage(ackId: Int): BufferMessage = {
- createBufferMessage(new Array[ByteBuffer](0), ackId)
- }
-
- def create(header: MessageChunkHeader): Message = {
- val newMessage: Message = header.typ match {
- case BUFFER_MESSAGE => new BufferMessage(header.id,
- ArrayBuffer(ByteBuffer.allocate(header.totalSize)), header.other)
- }
- newMessage.senderAddress = header.address
- newMessage
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/network/MessageChunk.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/network/MessageChunk.scala b/core/src/main/scala/spark/network/MessageChunk.scala
deleted file mode 100644
index 784db5a..0000000
--- a/core/src/main/scala/spark/network/MessageChunk.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.network
-
-import java.nio.ByteBuffer
-
-import scala.collection.mutable.ArrayBuffer
-
-
-private[network]
-class MessageChunk(val header: MessageChunkHeader, val buffer: ByteBuffer) {
-
- val size = if (buffer == null) 0 else buffer.remaining
-
- lazy val buffers = {
- val ab = new ArrayBuffer[ByteBuffer]()
- ab += header.buffer
- if (buffer != null) {
- ab += buffer
- }
- ab
- }
-
- override def toString = {
- "" + this.getClass.getSimpleName + " (id = " + header.id + ", size = " + size + ")"
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/network/MessageChunkHeader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/network/MessageChunkHeader.scala b/core/src/main/scala/spark/network/MessageChunkHeader.scala
deleted file mode 100644
index 18d0cbc..0000000
--- a/core/src/main/scala/spark/network/MessageChunkHeader.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.network
-
-import java.net.InetAddress
-import java.net.InetSocketAddress
-import java.nio.ByteBuffer
-
-
-private[spark] class MessageChunkHeader(
- val typ: Long,
- val id: Int,
- val totalSize: Int,
- val chunkSize: Int,
- val other: Int,
- val address: InetSocketAddress) {
- lazy val buffer = {
- // No need to change this, at 'use' time, we do a reverse lookup of the hostname.
- // Refer to network.Connection
- val ip = address.getAddress.getAddress()
- val port = address.getPort()
- ByteBuffer.
- allocate(MessageChunkHeader.HEADER_SIZE).
- putLong(typ).
- putInt(id).
- putInt(totalSize).
- putInt(chunkSize).
- putInt(other).
- putInt(ip.size).
- put(ip).
- putInt(port).
- position(MessageChunkHeader.HEADER_SIZE).
- flip.asInstanceOf[ByteBuffer]
- }
-
- override def toString = "" + this.getClass.getSimpleName + ":" + id + " of type " + typ +
- " and sizes " + totalSize + " / " + chunkSize + " bytes"
-}
-
-
-private[spark] object MessageChunkHeader {
- val HEADER_SIZE = 40
-
- def create(buffer: ByteBuffer): MessageChunkHeader = {
- if (buffer.remaining != HEADER_SIZE) {
- throw new IllegalArgumentException("Cannot convert buffer data to Message")
- }
- val typ = buffer.getLong()
- val id = buffer.getInt()
- val totalSize = buffer.getInt()
- val chunkSize = buffer.getInt()
- val other = buffer.getInt()
- val ipSize = buffer.getInt()
- val ipBytes = new Array[Byte](ipSize)
- buffer.get(ipBytes)
- val ip = InetAddress.getByAddress(ipBytes)
- val port = buffer.getInt()
- new MessageChunkHeader(typ, id, totalSize, chunkSize, other, new InetSocketAddress(ip, port))
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/network/ReceiverTest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/network/ReceiverTest.scala b/core/src/main/scala/spark/network/ReceiverTest.scala
deleted file mode 100644
index 2bbc736..0000000
--- a/core/src/main/scala/spark/network/ReceiverTest.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.network
-
-import java.nio.ByteBuffer
-import java.net.InetAddress
-
-private[spark] object ReceiverTest {
-
- def main(args: Array[String]) {
- val manager = new ConnectionManager(9999)
- println("Started connection manager with id = " + manager.id)
-
- manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
- /*println("Received [" + msg + "] from [" + id + "] at " + System.currentTimeMillis)*/
- val buffer = ByteBuffer.wrap("response".getBytes())
- Some(Message.createBufferMessage(buffer, msg.id))
- })
- Thread.currentThread.join()
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/network/SenderTest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/network/SenderTest.scala b/core/src/main/scala/spark/network/SenderTest.scala
deleted file mode 100644
index 542c54c..0000000
--- a/core/src/main/scala/spark/network/SenderTest.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.network
-
-import java.nio.ByteBuffer
-import java.net.InetAddress
-
-private[spark] object SenderTest {
-
- def main(args: Array[String]) {
-
- if (args.length < 2) {
- println("Usage: SenderTest <target host> <target port>")
- System.exit(1)
- }
-
- val targetHost = args(0)
- val targetPort = args(1).toInt
- val targetConnectionManagerId = new ConnectionManagerId(targetHost, targetPort)
-
- val manager = new ConnectionManager(0)
- println("Started connection manager with id = " + manager.id)
-
- manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
- println("Received [" + msg + "] from [" + id + "]")
- None
- })
-
- val size = 100 * 1024 * 1024
- val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => x.toByte))
- buffer.flip
-
- val targetServer = args(0)
-
- val count = 100
- (0 until count).foreach(i => {
- val dataMessage = Message.createBufferMessage(buffer.duplicate)
- val startTime = System.currentTimeMillis
- /*println("Started timer at " + startTime)*/
- val responseStr = manager.sendMessageReliablySync(targetConnectionManagerId, dataMessage) match {
- case Some(response) =>
- val buffer = response.asInstanceOf[BufferMessage].buffers(0)
- new String(buffer.array)
- case None => "none"
- }
- val finishTime = System.currentTimeMillis
- val mb = size / 1024.0 / 1024.0
- val ms = finishTime - startTime
- /*val resultStr = "Sent " + mb + " MB " + targetServer + " in " + ms + " ms at " + (mb / ms * 1000.0) + " MB/s"*/
- val resultStr = "Sent " + mb + " MB " + targetServer + " in " + ms + " ms (" + (mb / ms * 1000.0).toInt + "MB/s) | Response = " + responseStr
- println(resultStr)
- })
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/network/netty/FileHeader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/network/netty/FileHeader.scala b/core/src/main/scala/spark/network/netty/FileHeader.scala
deleted file mode 100644
index bf46d32..0000000
--- a/core/src/main/scala/spark/network/netty/FileHeader.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.network.netty
-
-import io.netty.buffer._
-
-import spark.Logging
-
-private[spark] class FileHeader (
- val fileLen: Int,
- val blockId: String) extends Logging {
-
- lazy val buffer = {
- val buf = Unpooled.buffer()
- buf.capacity(FileHeader.HEADER_SIZE)
- buf.writeInt(fileLen)
- buf.writeInt(blockId.length)
- blockId.foreach((x: Char) => buf.writeByte(x))
- //padding the rest of header
- if (FileHeader.HEADER_SIZE - buf.readableBytes > 0 ) {
- buf.writeZero(FileHeader.HEADER_SIZE - buf.readableBytes)
- } else {
- throw new Exception("too long header " + buf.readableBytes)
- logInfo("too long header")
- }
- buf
- }
-
-}
-
-private[spark] object FileHeader {
-
- val HEADER_SIZE = 40
-
- def getFileLenOffset = 0
- def getFileLenSize = Integer.SIZE/8
-
- def create(buf: ByteBuf): FileHeader = {
- val length = buf.readInt
- val idLength = buf.readInt
- val idBuilder = new StringBuilder(idLength)
- for (i <- 1 to idLength) {
- idBuilder += buf.readByte().asInstanceOf[Char]
- }
- val blockId = idBuilder.toString()
- new FileHeader(length, blockId)
- }
-
-
- def main (args:Array[String]){
-
- val header = new FileHeader(25,"block_0");
- val buf = header.buffer;
- val newheader = FileHeader.create(buf);
- System.out.println("id="+newheader.blockId+",size="+newheader.fileLen)
-
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/network/netty/ShuffleCopier.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/network/netty/ShuffleCopier.scala b/core/src/main/scala/spark/network/netty/ShuffleCopier.scala
deleted file mode 100644
index b01f636..0000000
--- a/core/src/main/scala/spark/network/netty/ShuffleCopier.scala
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.network.netty
-
-import java.util.concurrent.Executors
-
-import io.netty.buffer.ByteBuf
-import io.netty.channel.ChannelHandlerContext
-import io.netty.util.CharsetUtil
-
-import spark.Logging
-import spark.network.ConnectionManagerId
-
-import scala.collection.JavaConverters._
-
-
-private[spark] class ShuffleCopier extends Logging {
-
- def getBlock(host: String, port: Int, blockId: String,
- resultCollectCallback: (String, Long, ByteBuf) => Unit) {
-
- val handler = new ShuffleCopier.ShuffleClientHandler(resultCollectCallback)
- val connectTimeout = System.getProperty("spark.shuffle.netty.connect.timeout", "60000").toInt
- val fc = new FileClient(handler, connectTimeout)
-
- try {
- fc.init()
- fc.connect(host, port)
- fc.sendRequest(blockId)
- fc.waitForClose()
- fc.close()
- } catch {
- // Handle any socket-related exceptions in FileClient
- case e: Exception => {
- logError("Shuffle copy of block " + blockId + " from " + host + ":" + port + " failed", e)
- handler.handleError(blockId)
- }
- }
- }
-
- def getBlock(cmId: ConnectionManagerId, blockId: String,
- resultCollectCallback: (String, Long, ByteBuf) => Unit) {
- getBlock(cmId.host, cmId.port, blockId, resultCollectCallback)
- }
-
- def getBlocks(cmId: ConnectionManagerId,
- blocks: Seq[(String, Long)],
- resultCollectCallback: (String, Long, ByteBuf) => Unit) {
-
- for ((blockId, size) <- blocks) {
- getBlock(cmId, blockId, resultCollectCallback)
- }
- }
-}
-
-
-private[spark] object ShuffleCopier extends Logging {
-
- private class ShuffleClientHandler(resultCollectCallBack: (String, Long, ByteBuf) => Unit)
- extends FileClientHandler with Logging {
-
- override def handle(ctx: ChannelHandlerContext, in: ByteBuf, header: FileHeader) {
- logDebug("Received Block: " + header.blockId + " (" + header.fileLen + "B)");
- resultCollectCallBack(header.blockId, header.fileLen.toLong, in.readBytes(header.fileLen))
- }
-
- override def handleError(blockId: String) {
- if (!isComplete) {
- resultCollectCallBack(blockId, -1, null)
- }
- }
- }
-
- def echoResultCollectCallBack(blockId: String, size: Long, content: ByteBuf) {
- if (size != -1) {
- logInfo("File: " + blockId + " content is : \" " + content.toString(CharsetUtil.UTF_8) + "\"")
- }
- }
-
- def main(args: Array[String]) {
- if (args.length < 3) {
- System.err.println("Usage: ShuffleCopier <host> <port> <shuffle_block_id> <threads>")
- System.exit(1)
- }
- val host = args(0)
- val port = args(1).toInt
- val file = args(2)
- val threads = if (args.length > 3) args(3).toInt else 10
-
- val copiers = Executors.newFixedThreadPool(80)
- val tasks = (for (i <- Range(0, threads)) yield {
- Executors.callable(new Runnable() {
- def run() {
- val copier = new ShuffleCopier()
- copier.getBlock(host, port, file, echoResultCollectCallBack)
- }
- })
- }).asJava
- copiers.invokeAll(tasks)
- copiers.shutdown
- System.exit(0)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/network/netty/ShuffleSender.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/network/netty/ShuffleSender.scala b/core/src/main/scala/spark/network/netty/ShuffleSender.scala
deleted file mode 100644
index cdf88b0..0000000
--- a/core/src/main/scala/spark/network/netty/ShuffleSender.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.network.netty
-
-import java.io.File
-
-import spark.Logging
-
-
-private[spark] class ShuffleSender(portIn: Int, val pResolver: PathResolver) extends Logging {
-
- val server = new FileServer(pResolver, portIn)
- server.start()
-
- def stop() {
- server.stop()
- }
-
- def port: Int = server.getPort()
-}
-
-
-/**
- * An application for testing the shuffle sender as a standalone program.
- */
-private[spark] object ShuffleSender {
-
- def main(args: Array[String]) {
- if (args.length < 3) {
- System.err.println(
- "Usage: ShuffleSender <port> <subDirsPerLocalDir> <list of shuffle_block_directories>")
- System.exit(1)
- }
-
- val port = args(0).toInt
- val subDirsPerLocalDir = args(1).toInt
- val localDirs = args.drop(2).map(new File(_))
-
- val pResovler = new PathResolver {
- override def getAbsolutePath(blockId: String): String = {
- if (!blockId.startsWith("shuffle_")) {
- throw new Exception("Block " + blockId + " is not a shuffle block")
- }
- // Figure out which local directory it hashes to, and which subdirectory in that
- val hash = math.abs(blockId.hashCode)
- val dirId = hash % localDirs.length
- val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
- val subDir = new File(localDirs(dirId), "%02x".format(subDirId))
- val file = new File(subDir, blockId)
- return file.getAbsolutePath
- }
- }
- val sender = new ShuffleSender(port, pResovler)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/package.scala b/core/src/main/scala/spark/package.scala
deleted file mode 100644
index b244bfb..0000000
--- a/core/src/main/scala/spark/package.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Core Spark functionality. [[spark.SparkContext]] serves as the main entry point to Spark, while
- * [[spark.RDD]] is the data type representing a distributed collection, and provides most
- * parallel operations.
- *
- * In addition, [[spark.PairRDDFunctions]] contains operations available only on RDDs of key-value
- * pairs, such as `groupByKey` and `join`; [[spark.DoubleRDDFunctions]] contains operations
- * available only on RDDs of Doubles; and [[spark.SequenceFileRDDFunctions]] contains operations
- * available on RDDs that can be saved as SequenceFiles. These operations are automatically
- * available on any RDD of the right type (e.g. RDD[(Int, Int)] through implicit conversions when
- * you `import spark.SparkContext._`.
- */
-package object spark {
- // For package docs only
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/partial/ApproximateActionListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/partial/ApproximateActionListener.scala b/core/src/main/scala/spark/partial/ApproximateActionListener.scala
deleted file mode 100644
index 691d939..0000000
--- a/core/src/main/scala/spark/partial/ApproximateActionListener.scala
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.partial
-
-import spark._
-import spark.scheduler.JobListener
-
-/**
- * A JobListener for an approximate single-result action, such as count() or non-parallel reduce().
- * This listener waits up to timeout milliseconds and will return a partial answer even if the
- * complete answer is not available by then.
- *
- * This class assumes that the action is performed on an entire RDD[T] via a function that computes
- * a result of type U for each partition, and that the action returns a partial or complete result
- * of type R. Note that the type R must *include* any error bars on it (e.g. see BoundedInt).
- */
-private[spark] class ApproximateActionListener[T, U, R](
- rdd: RDD[T],
- func: (TaskContext, Iterator[T]) => U,
- evaluator: ApproximateEvaluator[U, R],
- timeout: Long)
- extends JobListener {
-
- val startTime = System.currentTimeMillis()
- val totalTasks = rdd.partitions.size
- var finishedTasks = 0
- var failure: Option[Exception] = None // Set if the job has failed (permanently)
- var resultObject: Option[PartialResult[R]] = None // Set if we've already returned a PartialResult
-
- override def taskSucceeded(index: Int, result: Any) {
- synchronized {
- evaluator.merge(index, result.asInstanceOf[U])
- finishedTasks += 1
- if (finishedTasks == totalTasks) {
- // If we had already returned a PartialResult, set its final value
- resultObject.foreach(r => r.setFinalValue(evaluator.currentResult()))
- // Notify any waiting thread that may have called awaitResult
- this.notifyAll()
- }
- }
- }
-
- override def jobFailed(exception: Exception) {
- synchronized {
- failure = Some(exception)
- this.notifyAll()
- }
- }
-
- /**
- * Waits for up to timeout milliseconds since the listener was created and then returns a
- * PartialResult with the result so far. This may be complete if the whole job is done.
- */
- def awaitResult(): PartialResult[R] = synchronized {
- val finishTime = startTime + timeout
- while (true) {
- val time = System.currentTimeMillis()
- if (failure != None) {
- throw failure.get
- } else if (finishedTasks == totalTasks) {
- return new PartialResult(evaluator.currentResult(), true)
- } else if (time >= finishTime) {
- resultObject = Some(new PartialResult(evaluator.currentResult(), false))
- return resultObject.get
- } else {
- this.wait(finishTime - time)
- }
- }
- // Should never be reached, but required to keep the compiler happy
- return null
- }
-}