You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2014/03/07 01:28:04 UTC

[2/3] SPARK-1189: Add Security to Spark - Akka, Http, ConnectionManager, UI use servlets

http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/core/src/main/scala/org/apache/spark/network/Connection.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/Connection.scala b/core/src/main/scala/org/apache/spark/network/Connection.scala
index 8219a18..8fd9c2b 100644
--- a/core/src/main/scala/org/apache/spark/network/Connection.scala
+++ b/core/src/main/scala/org/apache/spark/network/Connection.scala
@@ -17,6 +17,11 @@
 
 package org.apache.spark.network
 
+import org.apache.spark._
+import org.apache.spark.SparkSaslServer
+
+import scala.collection.mutable.{HashMap, Queue, ArrayBuffer}
+
 import java.net._
 import java.nio._
 import java.nio.channels._
@@ -27,13 +32,16 @@ import org.apache.spark._
 
 private[spark]
 abstract class Connection(val channel: SocketChannel, val selector: Selector,
-    val socketRemoteConnectionManagerId: ConnectionManagerId)
+    val socketRemoteConnectionManagerId: ConnectionManagerId, val connectionId: ConnectionId)
   extends Logging {
 
-  def this(channel_ : SocketChannel, selector_ : Selector) = {
+  var sparkSaslServer: SparkSaslServer = null
+  var sparkSaslClient: SparkSaslClient = null
+
+  def this(channel_ : SocketChannel, selector_ : Selector, id_ : ConnectionId) = {
     this(channel_, selector_,
       ConnectionManagerId.fromSocketAddress(
-        channel_.socket.getRemoteSocketAddress().asInstanceOf[InetSocketAddress]))
+        channel_.socket.getRemoteSocketAddress().asInstanceOf[InetSocketAddress]), id_)
   }
 
   channel.configureBlocking(false)
@@ -49,6 +57,16 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector,
 
   val remoteAddress = getRemoteAddress()
 
+  /**
+   * Used to synchronize client requests: client's work-related requests must
+   * wait until SASL authentication completes.
+   */
+  private val authenticated = new Object()
+
+  def getAuthenticated(): Object = authenticated
+
+  def isSaslComplete(): Boolean
+
   def resetForceReregister(): Boolean
 
   // Read channels typically do not register for write and write does not for read
@@ -69,6 +87,16 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector,
   // Will be true for ReceivingConnection, false for SendingConnection.
   def changeInterestForRead(): Boolean
 
+  private def disposeSasl() {
+    if (sparkSaslServer != null) {
+      sparkSaslServer.dispose();
+    }
+
+    if (sparkSaslClient != null) {
+      sparkSaslClient.dispose()
+    }
+  }
+
   // On receiving a write event, should we change the interest for this channel or not ?
   // Will be false for ReceivingConnection, true for SendingConnection.
   // Actually, for now, should not get triggered for ReceivingConnection
@@ -101,6 +129,7 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector,
       k.cancel()
     }
     channel.close()
+    disposeSasl()
     callOnCloseCallback()
   }
 
@@ -168,8 +197,12 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector,
 
 private[spark]
 class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
-    remoteId_ : ConnectionManagerId)
-  extends Connection(SocketChannel.open, selector_, remoteId_) {
+    remoteId_ : ConnectionManagerId, id_ : ConnectionId)
+  extends Connection(SocketChannel.open, selector_, remoteId_, id_) {
+
+  def isSaslComplete(): Boolean = {
+    if (sparkSaslClient != null) sparkSaslClient.isComplete() else false
+  }
 
   private class Outbox {
     val messages = new Queue[Message]()
@@ -226,6 +259,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
     data as detailed in https://github.com/mesos/spark/pull/791
    */
   private var needForceReregister = false
+
   val currentBuffers = new ArrayBuffer[ByteBuffer]()
 
   /*channel.socket.setSendBufferSize(256 * 1024)*/
@@ -316,6 +350,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
                 // If we have 'seen' pending messages, then reset flag - since we handle that as
                 // normal registering of event (below)
                 if (needForceReregister && buffers.exists(_.remaining() > 0)) resetForceReregister()
+
                 currentBuffers ++= buffers
               }
               case None => {
@@ -384,8 +419,15 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
 
 
 // Must be created within selector loop - else deadlock
-private[spark] class ReceivingConnection(channel_ : SocketChannel, selector_ : Selector)
-  extends Connection(channel_, selector_) {
+private[spark] class ReceivingConnection(
+    channel_ : SocketChannel,
+    selector_ : Selector,
+    id_ : ConnectionId)
+    extends Connection(channel_, selector_, id_) {
+
+  def isSaslComplete(): Boolean = {
+    if (sparkSaslServer != null) sparkSaslServer.isComplete() else false
+  }
 
   class Inbox() {
     val messages = new HashMap[Int, BufferMessage]()
@@ -396,6 +438,7 @@ private[spark] class ReceivingConnection(channel_ : SocketChannel, selector_ : S
         val newMessage = Message.create(header).asInstanceOf[BufferMessage]
         newMessage.started = true
         newMessage.startTime = System.currentTimeMillis
+        newMessage.isSecurityNeg = header.securityNeg == 1
         logDebug(
           "Starting to receive [" + newMessage + "] from [" + getRemoteConnectionManagerId() + "]")
         messages += ((newMessage.id, newMessage))
@@ -441,7 +484,7 @@ private[spark] class ReceivingConnection(channel_ : SocketChannel, selector_ : S
 
   val inbox = new Inbox()
   val headerBuffer: ByteBuffer = ByteBuffer.allocate(MessageChunkHeader.HEADER_SIZE)
-  var onReceiveCallback: (Connection , Message) => Unit = null
+  var onReceiveCallback: (Connection, Message) => Unit = null
   var currentChunk: MessageChunk = null
 
   channel.register(selector, SelectionKey.OP_READ)
@@ -516,7 +559,7 @@ private[spark] class ReceivingConnection(channel_ : SocketChannel, selector_ : S
         }
       }
     } catch {
-      case e: Exception  => {
+      case e: Exception => {
         logWarning("Error reading from connection to " + getRemoteConnectionManagerId(), e)
         callOnExceptionCallback(e)
         close()

http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/core/src/main/scala/org/apache/spark/network/ConnectionId.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionId.scala b/core/src/main/scala/org/apache/spark/network/ConnectionId.scala
new file mode 100644
index 0000000..ffaab67
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/network/ConnectionId.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network
+
+private[spark] case class ConnectionId(connectionManagerId: ConnectionManagerId, uniqId: Int) {
+  override def toString = connectionManagerId.host + "_" + connectionManagerId.port + "_" + uniqId  
+}
+
+private[spark] object ConnectionId {
+
+  def createConnectionIdFromString(connectionIdString: String): ConnectionId = {
+    val res = connectionIdString.split("_").map(_.trim())
+    if (res.size != 3) {
+      throw new Exception("Error converting ConnectionId string: " + connectionIdString + 
+        " to a ConnectionId Object")
+    }
+    new ConnectionId(new ConnectionManagerId(res(0), res(1).toInt), res(2).toInt)
+  } 
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
index a7f20f8..a75130c 100644
--- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
@@ -21,6 +21,9 @@ import java.net._
 import java.nio._
 import java.nio.channels._
 import java.nio.channels.spi._
+import java.net._
+import java.util.concurrent.atomic.AtomicInteger
+
 import java.util.concurrent.{LinkedBlockingDeque, TimeUnit, ThreadPoolExecutor}
 
 import scala.collection.mutable.ArrayBuffer
@@ -28,13 +31,15 @@ import scala.collection.mutable.HashMap
 import scala.collection.mutable.HashSet
 import scala.collection.mutable.SynchronizedMap
 import scala.collection.mutable.SynchronizedQueue
+
 import scala.concurrent.{Await, ExecutionContext, Future, Promise}
 import scala.concurrent.duration._
 
 import org.apache.spark._
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{SystemClock, Utils}
 
-private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Logging {
+private[spark] class ConnectionManager(port: Int, conf: SparkConf,
+    securityManager: SecurityManager) extends Logging {
 
   class MessageStatus(
       val message: Message,
@@ -50,6 +55,9 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
 
   private val selector = SelectorProvider.provider.openSelector()
 
+  // default to 30 second timeout waiting for authentication
+  private val authTimeout = conf.getInt("spark.core.connection.auth.wait.timeout", 30)
+
   private val handleMessageExecutor = new ThreadPoolExecutor(
     conf.getInt("spark.core.connection.handler.threads.min", 20),
     conf.getInt("spark.core.connection.handler.threads.max", 60),
@@ -71,6 +79,9 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
     new LinkedBlockingDeque[Runnable]())
 
   private val serverChannel = ServerSocketChannel.open()
+  // used to track the SendingConnections waiting to do SASL negotiation
+  private val connectionsAwaitingSasl = new HashMap[ConnectionId, SendingConnection] 
+    with SynchronizedMap[ConnectionId, SendingConnection]
   private val connectionsByKey =
     new HashMap[SelectionKey, Connection] with SynchronizedMap[SelectionKey, Connection]
   private val connectionsById = new HashMap[ConnectionManagerId, SendingConnection]
@@ -84,6 +95,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
 
   private var onReceiveCallback: (BufferMessage, ConnectionManagerId) => Option[Message]= null
 
+  private val authEnabled = securityManager.isAuthenticationEnabled()
+
   serverChannel.configureBlocking(false)
   serverChannel.socket.setReuseAddress(true)
   serverChannel.socket.setReceiveBufferSize(256 * 1024)
@@ -94,6 +107,10 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
   val id = new ConnectionManagerId(Utils.localHostName, serverChannel.socket.getLocalPort)
   logInfo("Bound socket to port " + serverChannel.socket.getLocalPort() + " with id = " + id)
 
+  // used in combination with the ConnectionManagerId to create unique Connection ids
+  // to be able to track asynchronous messages
+  private val idCount: AtomicInteger = new AtomicInteger(1)
+
   private val selectorThread = new Thread("connection-manager-thread") {
     override def run() = ConnectionManager.this.run()
   }
@@ -125,7 +142,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
         } finally {
           writeRunnableStarted.synchronized {
             writeRunnableStarted -= key
-            val needReregister = register || conn.resetForceReregister()
+            val needReregister = register || conn.resetForceReregister() 
             if (needReregister && conn.changeInterestForWrite()) {
               conn.registerInterest()
             }
@@ -372,7 +389,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
     // accept them all in a tight loop. non blocking accept with no processing, should be fine
     while (newChannel != null) {
       try {
-        val newConnection = new ReceivingConnection(newChannel, selector)
+        val newConnectionId = new ConnectionId(id, idCount.getAndIncrement.intValue)
+        val newConnection = new ReceivingConnection(newChannel, selector, newConnectionId)
         newConnection.onReceive(receiveMessage)
         addListeners(newConnection)
         addConnection(newConnection)
@@ -406,6 +424,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
         logInfo("Removing SendingConnection to " + sendingConnectionManagerId)
 
         connectionsById -= sendingConnectionManagerId
+        connectionsAwaitingSasl -= connection.connectionId
 
         messageStatuses.synchronized {
           messageStatuses
@@ -481,7 +500,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
       val creationTime = System.currentTimeMillis
       def run() {
         logDebug("Handler thread delay is " + (System.currentTimeMillis - creationTime) + " ms")
-        handleMessage(connectionManagerId, message)
+        handleMessage(connectionManagerId, message, connection)
         logDebug("Handling delay is " + (System.currentTimeMillis - creationTime) + " ms")
       }
     }
@@ -489,10 +508,133 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
     /*handleMessage(connection, message)*/
   }
 
-  private def handleMessage(connectionManagerId: ConnectionManagerId, message: Message) {
+  private def handleClientAuthentication(
+      waitingConn: SendingConnection,
+      securityMsg: SecurityMessage, 
+      connectionId : ConnectionId) {
+    if (waitingConn.isSaslComplete()) {
+      logDebug("Client sasl completed for id: "  + waitingConn.connectionId)
+      connectionsAwaitingSasl -= waitingConn.connectionId
+      waitingConn.getAuthenticated().synchronized {
+        waitingConn.getAuthenticated().notifyAll();
+      }
+      return
+    } else {
+      var replyToken : Array[Byte] = null
+      try {
+        replyToken = waitingConn.sparkSaslClient.saslResponse(securityMsg.getToken);
+        if (waitingConn.isSaslComplete()) {
+          logDebug("Client sasl completed after evaluate for id: " + waitingConn.connectionId)
+          connectionsAwaitingSasl -= waitingConn.connectionId
+          waitingConn.getAuthenticated().synchronized {
+            waitingConn.getAuthenticated().notifyAll()
+          }
+          return
+        }
+        var securityMsgResp = SecurityMessage.fromResponse(replyToken, 
+          securityMsg.getConnectionId.toString())
+        var message = securityMsgResp.toBufferMessage
+        if (message == null) throw new Exception("Error creating security message")
+        sendSecurityMessage(waitingConn.getRemoteConnectionManagerId(), message)
+      } catch  {
+        case e: Exception => {
+          logError("Error handling sasl client authentication", e)
+          waitingConn.close()
+          throw new Exception("Error evaluating sasl response: " + e)
+        }
+      }
+    }
+  }
+
+  private def handleServerAuthentication(
+      connection: Connection, 
+      securityMsg: SecurityMessage,
+      connectionId: ConnectionId) {
+    if (!connection.isSaslComplete()) {
+      logDebug("saslContext not established")
+      var replyToken : Array[Byte] = null
+      try {
+        connection.synchronized {
+          if (connection.sparkSaslServer == null) {
+            logDebug("Creating sasl Server")
+            connection.sparkSaslServer = new SparkSaslServer(securityManager)
+          }
+        }
+        replyToken = connection.sparkSaslServer.response(securityMsg.getToken)
+        if (connection.isSaslComplete()) {
+          logDebug("Server sasl completed: " + connection.connectionId) 
+        } else {
+          logDebug("Server sasl not completed: " + connection.connectionId)
+        }
+        if (replyToken != null) {
+          var securityMsgResp = SecurityMessage.fromResponse(replyToken,
+            securityMsg.getConnectionId)
+          var message = securityMsgResp.toBufferMessage
+          if (message == null) throw new Exception("Error creating security Message")
+          sendSecurityMessage(connection.getRemoteConnectionManagerId(), message)
+        } 
+      } catch {
+        case e: Exception => {
+          logError("Error in server auth negotiation: " + e)
+          // It would probably be better to send an error message telling other side auth failed
+          // but for now just close
+          connection.close()
+        }
+      }
+    } else {
+      logDebug("connection already established for this connection id: " + connection.connectionId) 
+    }
+  }
+
+
+  private def handleAuthentication(conn: Connection, bufferMessage: BufferMessage): Boolean = {
+    if (bufferMessage.isSecurityNeg) {
+      logDebug("This is security neg message")
+
+      // parse as SecurityMessage
+      val securityMsg = SecurityMessage.fromBufferMessage(bufferMessage)
+      val connectionId = ConnectionId.createConnectionIdFromString(securityMsg.getConnectionId)
+
+      connectionsAwaitingSasl.get(connectionId) match {
+        case Some(waitingConn) => {
+          // Client - this must be in response to us doing Send
+          logDebug("Client handleAuth for id: " +  waitingConn.connectionId)
+          handleClientAuthentication(waitingConn, securityMsg, connectionId)
+        }
+        case None => {
+          // Server - someone sent us something and we haven't authenticated yet
+          logDebug("Server handleAuth for id: " + connectionId)
+          handleServerAuthentication(conn, securityMsg, connectionId)
+        }
+      }
+      return true
+    } else {
+      if (!conn.isSaslComplete()) {
+        // We could handle this better and tell the client we need to do authentication 
+        // negotiation, but for now just ignore them. 
+        logError("message sent that is not security negotiation message on connection " +
+                 "not authenticated yet, ignoring it!!")
+        return true
+      }
+    }
+    return false
+  }
+
+  private def handleMessage(
+      connectionManagerId: ConnectionManagerId,
+      message: Message,
+      connection: Connection) {
     logDebug("Handling [" + message + "] from [" + connectionManagerId + "]")
     message match {
       case bufferMessage: BufferMessage => {
+        if (authEnabled) {
+          val res = handleAuthentication(connection, bufferMessage)
+          if (res == true) {
+            // message was security negotiation so skip the rest
+            logDebug("After handleAuth result was true, returning")
+            return
+          }
+        }
         if (bufferMessage.hasAckId) {
           val sentMessageStatus = messageStatuses.synchronized {
             messageStatuses.get(bufferMessage.ackId) match {
@@ -541,17 +683,124 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
     }
   }
 
+  private def checkSendAuthFirst(connManagerId: ConnectionManagerId, conn: SendingConnection) {
+    // see if we need to do sasl before writing
+    // this should only be the first negotiation as the Client!!!
+    if (!conn.isSaslComplete()) {
+      conn.synchronized {
+        if (conn.sparkSaslClient == null) {
+          conn.sparkSaslClient = new SparkSaslClient(securityManager)
+          var firstResponse: Array[Byte] = null
+          try {
+            firstResponse = conn.sparkSaslClient.firstToken()
+            var securityMsg = SecurityMessage.fromResponse(firstResponse,
+              conn.connectionId.toString())
+            var message = securityMsg.toBufferMessage
+            if (message == null) throw new Exception("Error creating security message")
+            connectionsAwaitingSasl += ((conn.connectionId, conn))
+            sendSecurityMessage(connManagerId, message)
+            logDebug("adding connectionsAwaitingSasl id: " + conn.connectionId)
+          } catch {
+            case e: Exception => {
+              logError("Error getting first response from the SaslClient.", e)
+              conn.close()
+              throw new Exception("Error getting first response from the SaslClient")
+            }
+          }
+        }
+      }
+    } else {
+      logDebug("Sasl already established ") 
+    }
+  }
+
+  // allow us to add messages to the inbox for doing sasl negotiating 
+  private def sendSecurityMessage(connManagerId: ConnectionManagerId, message: Message) {
+    def startNewConnection(): SendingConnection = {
+      val inetSocketAddress = new InetSocketAddress(connManagerId.host, connManagerId.port)
+      val newConnectionId = new ConnectionId(id, idCount.getAndIncrement.intValue)
+      val newConnection = new SendingConnection(inetSocketAddress, selector, connManagerId,
+        newConnectionId)
+      logInfo("creating new sending connection for security! " + newConnectionId )
+      registerRequests.enqueue(newConnection)
+
+      newConnection
+    }
+    // I removed the lookupKey stuff as part of merge ... should I re-add it ?
+    // We did not find it useful in our test-env ...
+    // If we do re-add it, we should consistently use it everywhere I guess ?
+    message.senderAddress = id.toSocketAddress()
+    logTrace("Sending Security [" + message + "] to [" + connManagerId + "]")
+    val connection = connectionsById.getOrElseUpdate(connManagerId, startNewConnection())
+
+    //send security message until going connection has been authenticated
+    connection.send(message)
+
+    wakeupSelector()
+  }
+
   private def sendMessage(connectionManagerId: ConnectionManagerId, message: Message) {
     def startNewConnection(): SendingConnection = {
       val inetSocketAddress = new InetSocketAddress(connectionManagerId.host,
         connectionManagerId.port)
-      val newConnection = new SendingConnection(inetSocketAddress, selector, connectionManagerId)
+      val newConnectionId = new ConnectionId(id, idCount.getAndIncrement.intValue)
+      val newConnection = new SendingConnection(inetSocketAddress, selector, connectionManagerId,
+        newConnectionId)
+      logTrace("creating new sending connection: " + newConnectionId)
       registerRequests.enqueue(newConnection)
 
       newConnection
     }
     val connection = connectionsById.getOrElseUpdate(connectionManagerId, startNewConnection())
+    if (authEnabled) {
+      checkSendAuthFirst(connectionManagerId, connection)
+    }
     message.senderAddress = id.toSocketAddress()
+    logDebug("Before Sending [" + message + "] to [" + connectionManagerId + "]" + " " +
+      "connectionid: "  + connection.connectionId)
+
+    if (authEnabled) {
+      // if we aren't authenticated yet lets block the senders until authentication completes
+      try {
+        connection.getAuthenticated().synchronized {
+          val clock = SystemClock
+          val startTime = clock.getTime()
+
+          while (!connection.isSaslComplete()) {
+            logDebug("getAuthenticated wait connectionid: " + connection.connectionId)
+            // have timeout in case remote side never responds
+            connection.getAuthenticated().wait(500)
+            if (((clock.getTime() - startTime) >= (authTimeout * 1000))
+              && (!connection.isSaslComplete())) {
+              // took to long to authenticate the connection, something probably went wrong
+              throw new Exception("Took to long for authentication to " + connectionManagerId + 
+                ", waited " + authTimeout + "seconds, failing.")
+            }
+          }
+        }
+      } catch {
+        case e: Exception => logError("Exception while waiting for authentication.", e)
+
+        // need to tell sender it failed
+        messageStatuses.synchronized {
+          val s = messageStatuses.get(message.id)
+          s match {
+            case Some(msgStatus) => {
+              messageStatuses -= message.id
+              logInfo("Notifying " + msgStatus.connectionManagerId)
+              msgStatus.synchronized {
+                msgStatus.attempted = true
+                msgStatus.acked = false
+                msgStatus.markDone()
+              }
+            }
+            case None => {
+              logError("no messageStatus for failed message id: " + message.id) 
+            }
+          }
+        }
+      }
+    }
     logDebug("Sending [" + message + "] to [" + connectionManagerId + "]")
     connection.send(message)
 
@@ -603,7 +852,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
 private[spark] object ConnectionManager {
 
   def main(args: Array[String]) {
-    val manager = new ConnectionManager(9999, new SparkConf)
+    val conf = new SparkConf
+    val manager = new ConnectionManager(9999, conf, new SecurityManager(conf))
     manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
       println("Received [" + msg + "] from [" + id + "]")
       None

http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/core/src/main/scala/org/apache/spark/network/Message.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/Message.scala b/core/src/main/scala/org/apache/spark/network/Message.scala
index 20fe676..7caccfd 100644
--- a/core/src/main/scala/org/apache/spark/network/Message.scala
+++ b/core/src/main/scala/org/apache/spark/network/Message.scala
@@ -27,6 +27,7 @@ private[spark] abstract class Message(val typ: Long, val id: Int) {
   var started = false
   var startTime = -1L
   var finishTime = -1L
+  var isSecurityNeg = false
 
   def size: Int
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/core/src/main/scala/org/apache/spark/network/MessageChunkHeader.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/MessageChunkHeader.scala b/core/src/main/scala/org/apache/spark/network/MessageChunkHeader.scala
index 9bcbc61..ead663e 100644
--- a/core/src/main/scala/org/apache/spark/network/MessageChunkHeader.scala
+++ b/core/src/main/scala/org/apache/spark/network/MessageChunkHeader.scala
@@ -27,6 +27,7 @@ private[spark] class MessageChunkHeader(
     val totalSize: Int,
     val chunkSize: Int,
     val other: Int,
+    val securityNeg: Int,
     val address: InetSocketAddress) {
   lazy val buffer = {
     // No need to change this, at 'use' time, we do a reverse lookup of the hostname.
@@ -40,6 +41,7 @@ private[spark] class MessageChunkHeader(
       putInt(totalSize).
       putInt(chunkSize).
       putInt(other).
+      putInt(securityNeg).
       putInt(ip.size).
       put(ip).
       putInt(port).
@@ -48,12 +50,13 @@ private[spark] class MessageChunkHeader(
   }
 
   override def toString = "" + this.getClass.getSimpleName + ":" + id + " of type " + typ +
-      " and sizes " + totalSize + " / " + chunkSize + " bytes"
+      " and sizes " + totalSize + " / " + chunkSize + " bytes, securityNeg: " + securityNeg
+
 }
 
 
 private[spark] object MessageChunkHeader {
-  val HEADER_SIZE = 40
+  val HEADER_SIZE = 44
 
   def create(buffer: ByteBuffer): MessageChunkHeader = {
     if (buffer.remaining != HEADER_SIZE) {
@@ -64,11 +67,13 @@ private[spark] object MessageChunkHeader {
     val totalSize = buffer.getInt()
     val chunkSize = buffer.getInt()
     val other = buffer.getInt()
+    val securityNeg = buffer.getInt()
     val ipSize = buffer.getInt()
     val ipBytes = new Array[Byte](ipSize)
     buffer.get(ipBytes)
     val ip = InetAddress.getByAddress(ipBytes)
     val port = buffer.getInt()
-    new MessageChunkHeader(typ, id, totalSize, chunkSize, other, new InetSocketAddress(ip, port))
+    new MessageChunkHeader(typ, id, totalSize, chunkSize, other, securityNeg,
+      new InetSocketAddress(ip, port))
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala b/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala
index 9976255..3c09a71 100644
--- a/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala
+++ b/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala
@@ -18,12 +18,12 @@
 package org.apache.spark.network
 
 import java.nio.ByteBuffer
-
-import org.apache.spark.SparkConf
+import org.apache.spark.{SecurityManager, SparkConf} 
 
 private[spark] object ReceiverTest {
   def main(args: Array[String]) {
-    val manager = new ConnectionManager(9999, new SparkConf)
+    val conf = new SparkConf
+    val manager = new ConnectionManager(9999, conf, new SecurityManager(conf))
     println("Started connection manager with id = " + manager.id)
 
     manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {

http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/core/src/main/scala/org/apache/spark/network/SecurityMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/SecurityMessage.scala b/core/src/main/scala/org/apache/spark/network/SecurityMessage.scala
new file mode 100644
index 0000000..0d9f743
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/network/SecurityMessage.scala
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network
+
+import java.nio.ByteBuffer
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.StringBuilder
+
+import org.apache.spark._
+import org.apache.spark.network._
+
+/**
+ * SecurityMessage is class that contains the connectionId and sasl token 
+ * used in SASL negotiation. SecurityMessage has routines for converting
+ * it to and from a BufferMessage so that it can be sent by the ConnectionManager
+ * and easily consumed by users when received.
+ * The api was modeled after BlockMessage.
+ *
+ * The connectionId is the connectionId of the client side. Since 
+ * message passing is asynchronous and its possible for the server side (receiving)
+ * to get multiple different types of messages on the same connection the connectionId 
+ * is used to know which connnection the security message is intended for. 
+ * 
+ * For instance, lets say we are node_0. We need to send data to node_1. The node_0 side
+ * is acting as a client and connecting to node_1. SASL negotiation has to occur
+ * between node_0 and node_1 before node_1 trusts node_0 so node_0 sends a security message. 
+ * node_1 receives the message from node_0 but before it can process it and send a response, 
+ * some thread on node_1 decides it needs to send data to node_0 so it connects to node_0 
+ * and sends a security message of its own to authenticate as a client. Now node_0 gets 
+ * the message and it needs to decide if this message is in response to it being a client 
+ * (from the first send) or if its just node_1 trying to connect to it to send data.  This 
+ * is where the connectionId field is used. node_0 can lookup the connectionId to see if
+ * it is in response to it being a client or if its in response to someone sending other data.
+ * 
+ * The format of a SecurityMessage as its sent is:
+ *   - Length of the ConnectionId
+ *   - ConnectionId 
+ *   - Length of the token
+ *   - Token 
+ */
+private[spark] class SecurityMessage() extends Logging {
+
+  private var connectionId: String = null
+  private var token: Array[Byte] = null
+
+  def set(byteArr: Array[Byte], newconnectionId: String) {
+    if (byteArr == null) {
+      token = new Array[Byte](0) 
+    } else {
+      token = byteArr
+    }
+    connectionId = newconnectionId
+  }
+ 
+  /**
+   * Read the given buffer and set the members of this class.
+   */
+  def set(buffer: ByteBuffer) {
+    val idLength = buffer.getInt()
+    val idBuilder = new StringBuilder(idLength)
+    for (i <- 1 to idLength) {
+        idBuilder += buffer.getChar()
+    }
+    connectionId  = idBuilder.toString()
+
+    val tokenLength = buffer.getInt()
+    token = new Array[Byte](tokenLength)
+    if (tokenLength > 0) {
+      buffer.get(token, 0, tokenLength)
+    }
+  }
+
+  def set(bufferMsg: BufferMessage) {
+    val buffer = bufferMsg.buffers.apply(0)
+    buffer.clear()
+    set(buffer)
+  }
+  
+  def getConnectionId: String = {
+    return connectionId
+  }
+  
+  def getToken: Array[Byte] = {
+    return token
+  }
+  
+  /**
+   * Create a BufferMessage that can be sent by the ConnectionManager containing 
+   * the security information from this class.
+   * @return BufferMessage
+   */
+  def toBufferMessage: BufferMessage = {
+    val startTime = System.currentTimeMillis
+    val buffers = new ArrayBuffer[ByteBuffer]()
+
+    // 4 bytes for the length of the connectionId
+    // connectionId is of type char so multiple the length by 2 to get number of bytes 
+    // 4 bytes for the length of token
+    // token is a byte buffer so just take the length
+    var buffer = ByteBuffer.allocate(4 + connectionId.length() * 2 + 4 + token.length)
+    buffer.putInt(connectionId.length())
+    connectionId.foreach((x: Char) => buffer.putChar(x)) 
+    buffer.putInt(token.length)
+
+    if (token.length > 0) {
+      buffer.put(token)
+    }
+    buffer.flip()
+    buffers += buffer
+    
+    var message = Message.createBufferMessage(buffers)
+    logDebug("message total size is : " + message.size)
+    message.isSecurityNeg = true
+    return message
+  }
+
+  override def toString: String = {
+    "SecurityMessage [connId= " + connectionId + ", Token = " + token + "]"
+  }
+}
+
+private[spark] object SecurityMessage {
+ 
+  /**
+   * Convert the given BufferMessage to a SecurityMessage by parsing the contents
+   * of the BufferMessage and populating the SecurityMessage fields.
+   * @param bufferMessage is a BufferMessage that was received
+   * @return new SecurityMessage
+   */
+  def fromBufferMessage(bufferMessage: BufferMessage): SecurityMessage = {
+    val newSecurityMessage = new SecurityMessage()
+    newSecurityMessage.set(bufferMessage)
+    newSecurityMessage
+  }
+
+  /**
+   * Create a SecurityMessage to send from a given saslResponse.
+   * @param response is the response to a challenge from the SaslClient or Saslserver
+   * @param connectionId the client connectionId we are negotiation authentication for
+   * @return a new SecurityMessage
+   */
+  def fromResponse(response : Array[Byte], connectionId : String) : SecurityMessage = {
+    val newSecurityMessage = new SecurityMessage()
+    newSecurityMessage.set(response, connectionId)
+    newSecurityMessage
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/core/src/main/scala/org/apache/spark/network/SenderTest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/SenderTest.scala b/core/src/main/scala/org/apache/spark/network/SenderTest.scala
index 646f842..aac2c24 100644
--- a/core/src/main/scala/org/apache/spark/network/SenderTest.scala
+++ b/core/src/main/scala/org/apache/spark/network/SenderTest.scala
@@ -18,8 +18,7 @@
 package org.apache.spark.network
 
 import java.nio.ByteBuffer
-
-import org.apache.spark.SparkConf
+import org.apache.spark.{SecurityManager, SparkConf}
 
 private[spark] object SenderTest {
   def main(args: Array[String]) {
@@ -32,8 +31,8 @@ private[spark] object SenderTest {
     val targetHost = args(0)
     val targetPort = args(1).toInt
     val targetConnectionManagerId = new ConnectionManagerId(targetHost, targetPort)
-
-    val manager = new ConnectionManager(0, new SparkConf)
+    val conf = new SparkConf
+    val manager = new ConnectionManager(0, conf, new SecurityManager(conf))
     println("Started connection manager with id = " + manager.id)
 
     manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {

http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 977c246..1bf3f4d 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -29,7 +29,7 @@ import akka.actor.{ActorSystem, Cancellable, Props}
 import it.unimi.dsi.fastutil.io.{FastBufferedOutputStream, FastByteArrayOutputStream}
 import sun.nio.ch.DirectBuffer
 
-import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException}
+import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException, SecurityManager}
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.network._
 import org.apache.spark.serializer.Serializer
@@ -47,7 +47,8 @@ private[spark] class BlockManager(
     val master: BlockManagerMaster,
     val defaultSerializer: Serializer,
     maxMemory: Long,
-    val conf: SparkConf)
+    val conf: SparkConf,
+    securityManager: SecurityManager)
   extends Logging {
 
   val shuffleBlockManager = new ShuffleBlockManager(this)
@@ -66,7 +67,7 @@ private[spark] class BlockManager(
     if (useNetty) diskBlockManager.startShuffleBlockSender(nettyPortConfig) else 0
   }
 
-  val connectionManager = new ConnectionManager(0, conf)
+  val connectionManager = new ConnectionManager(0, conf, securityManager)
   implicit val futureExecContext = connectionManager.futureExecContext
 
   val blockManagerId = BlockManagerId(
@@ -122,8 +123,9 @@ private[spark] class BlockManager(
    * Construct a BlockManager with a memory limit set based on system properties.
    */
   def this(execId: String, actorSystem: ActorSystem, master: BlockManagerMaster,
-           serializer: Serializer, conf: SparkConf) = {
-    this(execId, actorSystem, master, serializer, BlockManager.getMaxMemory(conf), conf)
+           serializer: Serializer, conf: SparkConf, securityManager: SecurityManager) = {
+    this(execId, actorSystem, master, serializer, BlockManager.getMaxMemory(conf), conf, 
+      securityManager)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
index 1d81d00..36f2a0f 100644
--- a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
@@ -24,6 +24,7 @@ import util.Random
 
 import org.apache.spark.SparkConf
 import org.apache.spark.serializer.KryoSerializer
+import org.apache.spark.{SecurityManager, SparkConf}
 
 /**
  * This class tests the BlockManager and MemoryStore for thread safety and
@@ -98,7 +99,8 @@ private[spark] object ThreadingTest {
     val blockManagerMaster = new BlockManagerMaster(
       actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf))), conf)
     val blockManager = new BlockManager(
-      "<driver>", actorSystem, blockManagerMaster, serializer, 1024 * 1024, conf)
+      "<driver>", actorSystem, blockManagerMaster, serializer, 1024 * 1024, conf,
+      new SecurityManager(conf))
     val producers = (1 to numProducers).map(i => new ProducerThread(blockManager, i))
     val consumers = producers.map(p => new ConsumerThread(blockManager, p.queue))
     producers.foreach(_.start)

http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
index 1b78c52..7c35cd1 100644
--- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
@@ -18,7 +18,8 @@
 package org.apache.spark.ui
 
 import java.net.InetSocketAddress
-import javax.servlet.http.{HttpServletResponse, HttpServletRequest}
+import java.net.URL
+import javax.servlet.http.{HttpServlet, HttpServletResponse, HttpServletRequest}
 
 import scala.annotation.tailrec
 import scala.util.{Failure, Success, Try}
@@ -26,11 +27,14 @@ import scala.xml.Node
 
 import org.json4s.JValue
 import org.json4s.jackson.JsonMethods.{pretty, render}
-import org.eclipse.jetty.server.{Handler, Request, Server}
-import org.eclipse.jetty.server.handler.{AbstractHandler, ContextHandler, HandlerList, ResourceHandler}
+
+import org.eclipse.jetty.server.{DispatcherType, Server}
+import org.eclipse.jetty.server.handler.HandlerList
+import org.eclipse.jetty.servlet.{DefaultServlet, FilterHolder, ServletContextHandler, ServletHolder}
 import org.eclipse.jetty.util.thread.QueuedThreadPool
 
-import org.apache.spark.Logging
+import org.apache.spark.{Logging, SecurityManager, SparkConf}
+
 
 /** Utilities for launching a web server using Jetty's HTTP Server class */
 private[spark] object JettyUtils extends Logging {
@@ -39,57 +43,104 @@ private[spark] object JettyUtils extends Logging {
 
   type Responder[T] = HttpServletRequest => T
 
-  // Conversions from various types of Responder's to jetty Handlers
-  implicit def jsonResponderToHandler(responder: Responder[JValue]): Handler =
-    createHandler(responder, "text/json", (in: JValue) => pretty(render(in)))
+  class ServletParams[T <% AnyRef](val responder: Responder[T],
+    val contentType: String,
+    val extractFn: T => String = (in: Any) => in.toString) {}
+
+  // Conversions from various types of Responder's to appropriate servlet parameters
+  implicit def jsonResponderToServlet(responder: Responder[JValue]): ServletParams[JValue] =
+    new ServletParams(responder, "text/json", (in: JValue) => pretty(render(in)))
 
-  implicit def htmlResponderToHandler(responder: Responder[Seq[Node]]): Handler =
-    createHandler(responder, "text/html", (in: Seq[Node]) => "<!DOCTYPE html>" + in.toString)
+  implicit def htmlResponderToServlet(responder: Responder[Seq[Node]]): ServletParams[Seq[Node]] =
+    new ServletParams(responder, "text/html", (in: Seq[Node]) => "<!DOCTYPE html>" + in.toString)
 
-  implicit def textResponderToHandler(responder: Responder[String]): Handler =
-    createHandler(responder, "text/plain")
+  implicit def textResponderToServlet(responder: Responder[String]): ServletParams[String] =
+    new ServletParams(responder, "text/plain")
 
-  def createHandler[T <% AnyRef](responder: Responder[T], contentType: String,
-                                 extractFn: T => String = (in: Any) => in.toString): Handler = {
-    new AbstractHandler {
-      def handle(target: String,
-                 baseRequest: Request,
-                 request: HttpServletRequest,
+  def createServlet[T <% AnyRef](servletParams: ServletParams[T],
+      securityMgr: SecurityManager): HttpServlet = {
+    new HttpServlet {
+      override def doGet(request: HttpServletRequest,
                  response: HttpServletResponse) {
-        response.setContentType("%s;charset=utf-8".format(contentType))
-        response.setStatus(HttpServletResponse.SC_OK)
-        baseRequest.setHandled(true)
-        val result = responder(request)
-        response.setHeader("Cache-Control", "no-cache, no-store, must-revalidate")
-        response.getWriter().println(extractFn(result))
+        if (securityMgr.checkUIViewPermissions(request.getRemoteUser())) {
+          response.setContentType("%s;charset=utf-8".format(servletParams.contentType))
+          response.setStatus(HttpServletResponse.SC_OK)
+          val result = servletParams.responder(request)
+          response.setHeader("Cache-Control", "no-cache, no-store, must-revalidate")
+          response.getWriter().println(servletParams.extractFn(result))
+        } else {
+          response.setStatus(HttpServletResponse.SC_UNAUTHORIZED)
+          response.setHeader("Cache-Control", "no-cache, no-store, must-revalidate")
+          response.sendError(HttpServletResponse.SC_UNAUTHORIZED,
+            "User is not authorized to access this page.");
+        }
       }
     }
   }
 
+  def createServletHandler(path: String, servlet: HttpServlet): ServletContextHandler = {
+    val contextHandler = new ServletContextHandler()
+    val holder = new ServletHolder(servlet)
+    contextHandler.setContextPath(path)
+    contextHandler.addServlet(holder, "/")
+    contextHandler
+  }
+
   /** Creates a handler that always redirects the user to a given path */
-  def createRedirectHandler(newPath: String): Handler = {
-    new AbstractHandler {
-      def handle(target: String,
-                 baseRequest: Request,
-                 request: HttpServletRequest,
+  def createRedirectHandler(newPath: String, path: String): ServletContextHandler = {
+    val servlet = new HttpServlet {
+      override def doGet(request: HttpServletRequest,
                  response: HttpServletResponse) {
-        response.setStatus(302)
-        response.setHeader("Location", baseRequest.getRootURL + newPath)
-        baseRequest.setHandled(true)
+        // make sure we don't end up with // in the middle
+        val newUri = new URL(new URL(request.getRequestURL.toString), newPath).toURI
+        response.sendRedirect(newUri.toString)
       }
     }
+    val contextHandler = new ServletContextHandler()
+    val holder = new ServletHolder(servlet)
+    contextHandler.setContextPath(path)
+    contextHandler.addServlet(holder, "/")
+    contextHandler
   }
 
   /** Creates a handler for serving files from a static directory */
-  def createStaticHandler(resourceBase: String): ResourceHandler = {
-    val staticHandler = new ResourceHandler
+  def createStaticHandler(resourceBase: String, path: String): ServletContextHandler = {
+    val contextHandler = new ServletContextHandler()
+    val staticHandler = new DefaultServlet
+    val holder = new ServletHolder(staticHandler)
     Option(getClass.getClassLoader.getResource(resourceBase)) match {
       case Some(res) =>
-        staticHandler.setResourceBase(res.toString)
+        holder.setInitParameter("resourceBase", res.toString)
       case None =>
         throw new Exception("Could not find resource path for Web UI: " + resourceBase)
     }
-    staticHandler
+    contextHandler.addServlet(holder, path)
+    contextHandler
+  }
+
+  private def addFilters(handlers: Seq[ServletContextHandler], conf: SparkConf) {
+    val filters: Array[String] = conf.get("spark.ui.filters", "").split(',').map(_.trim())
+    filters.foreach {
+      case filter : String => 
+        if (!filter.isEmpty) {
+          logInfo("Adding filter: " + filter)
+          val holder : FilterHolder = new FilterHolder()
+          holder.setClassName(filter)
+          // get any parameters for each filter
+          val paramName = "spark." + filter + ".params"
+          val params = conf.get(paramName, "").split(',').map(_.trim()).toSet
+          params.foreach {
+            case param : String =>
+              if (!param.isEmpty) {
+                val parts = param.split("=")
+                if (parts.length == 2) holder.setInitParameter(parts(0), parts(1))
+             }
+          }
+          val enumDispatcher = java.util.EnumSet.of(DispatcherType.ASYNC, DispatcherType.ERROR, 
+            DispatcherType.FORWARD, DispatcherType.INCLUDE, DispatcherType.REQUEST)
+          handlers.foreach { case(handler) => handler.addFilter(holder, "/*", enumDispatcher) }
+        }
+    }
   }
 
   /**
@@ -99,17 +150,12 @@ private[spark] object JettyUtils extends Logging {
    * If the desired port number is contented, continues incrementing ports until a free port is
    * found. Returns the chosen port and the jetty Server object.
    */
-  def startJettyServer(hostName: String, port: Int, handlers: Seq[(String, Handler)]): (Server, Int)
-  = {
-
-    val handlersToRegister = handlers.map { case(path, handler) =>
-      val contextHandler = new ContextHandler(path)
-      contextHandler.setHandler(handler)
-      contextHandler.asInstanceOf[org.eclipse.jetty.server.Handler]
-    }
+  def startJettyServer(hostName: String, port: Int, handlers: Seq[ServletContextHandler],
+      conf: SparkConf): (Server, Int) = {
 
+    addFilters(handlers, conf)
     val handlerList = new HandlerList
-    handlerList.setHandlers(handlersToRegister.toArray)
+    handlerList.setHandlers(handlers.toArray)
 
     @tailrec
     def connect(currentPort: Int): (Server, Int) = {
@@ -119,7 +165,9 @@ private[spark] object JettyUtils extends Logging {
       server.setThreadPool(pool)
       server.setHandler(handlerList)
 
-      Try { server.start() } match {
+      Try {
+        server.start()
+      } match {
         case s: Success[_] =>
           (server, server.getConnectors.head.getLocalPort)
         case f: Failure[_] =>

http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
index af6b658..ca82c3d 100644
--- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
@@ -17,7 +17,10 @@
 
 package org.apache.spark.ui
 
-import org.eclipse.jetty.server.{Handler, Server}
+import javax.servlet.http.HttpServletRequest
+
+import org.eclipse.jetty.server.Server
+import org.eclipse.jetty.servlet.ServletContextHandler
 
 import org.apache.spark.{Logging, SparkContext, SparkEnv}
 import org.apache.spark.ui.JettyUtils._
@@ -34,9 +37,9 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging {
   var boundPort: Option[Int] = None
   var server: Option[Server] = None
 
-  val handlers = Seq[(String, Handler)](
-    ("/static", createStaticHandler(SparkUI.STATIC_RESOURCE_DIR)),
-    ("/", createRedirectHandler("/stages"))
+  val handlers = Seq[ServletContextHandler] (
+    createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static/*"),
+    createRedirectHandler("/stages", "/")
   )
   val storage = new BlockManagerUI(sc)
   val jobs = new JobProgressUI(sc)
@@ -52,7 +55,7 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging {
   /** Bind the HTTP server which backs this web interface */
   def bind() {
     try {
-      val (srv, usedPort) = JettyUtils.startJettyServer(host, port, allHandlers)
+      val (srv, usedPort) = JettyUtils.startJettyServer(host, port, allHandlers, sc.conf)
       logInfo("Started Spark Web UI at http://%s:%d".format(host, usedPort))
       server = Some(srv)
       boundPort = Some(usedPort)
@@ -83,5 +86,5 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging {
 
 private[spark] object SparkUI {
   val DEFAULT_PORT = "4040"
-  val STATIC_RESOURCE_DIR = "org/apache/spark/ui/static"
+  val STATIC_RESOURCE_DIR = "org/apache/spark/ui"
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala
index 9e7cdc8..1433347 100644
--- a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala
@@ -23,7 +23,7 @@ import scala.collection.JavaConversions._
 import scala.util.Properties
 import scala.xml.Node
 
-import org.eclipse.jetty.server.Handler
+import org.eclipse.jetty.servlet.ServletContextHandler
 
 import org.apache.spark.SparkContext
 import org.apache.spark.ui.JettyUtils._
@@ -32,8 +32,9 @@ import org.apache.spark.ui.UIUtils
 
 private[spark] class EnvironmentUI(sc: SparkContext) {
 
-  def getHandlers = Seq[(String, Handler)](
-    ("/environment", (request: HttpServletRequest) => envDetails(request))
+  def getHandlers = Seq[ServletContextHandler](
+    createServletHandler("/environment",
+      createServlet((request: HttpServletRequest) => envDetails(request), sc.env.securityManager))
   )
 
   def envDetails(request: HttpServletRequest): Seq[Node] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
index 1f3b7a4..4235cfe 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
@@ -22,7 +22,7 @@ import javax.servlet.http.HttpServletRequest
 import scala.collection.mutable.{HashMap, HashSet}
 import scala.xml.Node
 
-import org.eclipse.jetty.server.Handler
+import org.eclipse.jetty.servlet.ServletContextHandler
 
 import org.apache.spark.{ExceptionFailure, Logging, SparkContext}
 import org.apache.spark.executor.TaskMetrics
@@ -43,8 +43,9 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
     sc.addSparkListener(listener)
   }
 
-  def getHandlers = Seq[(String, Handler)](
-    ("/executors", (request: HttpServletRequest) => render(request))
+  def getHandlers = Seq[ServletContextHandler](
+    createServletHandler("/executors", createServlet((request: HttpServletRequest) => render
+      (request), sc.env.securityManager))
   )
 
   def render(request: HttpServletRequest): Seq[Node] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
index 557bce6..2d95d47 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
@@ -23,6 +23,7 @@ import javax.servlet.http.HttpServletRequest
 import scala.Seq
 
 import org.eclipse.jetty.server.Handler
+import org.eclipse.jetty.servlet.ServletContextHandler
 
 import org.apache.spark.SparkContext
 import org.apache.spark.ui.JettyUtils._
@@ -45,9 +46,15 @@ private[spark] class JobProgressUI(val sc: SparkContext) {
 
   def formatDuration(ms: Long) = Utils.msDurationToString(ms)
 
-  def getHandlers = Seq[(String, Handler)](
-    ("/stages/stage", (request: HttpServletRequest) => stagePage.render(request)),
-    ("/stages/pool", (request: HttpServletRequest) => poolPage.render(request)),
-    ("/stages", (request: HttpServletRequest) => indexPage.render(request))
+  def getHandlers = Seq[ServletContextHandler](
+    createServletHandler("/stages/stage",
+      createServlet((request: HttpServletRequest) => stagePage.render(request),
+        sc.env.securityManager)),
+    createServletHandler("/stages/pool",
+      createServlet((request: HttpServletRequest) => poolPage.render(request),
+        sc.env.securityManager)),
+    createServletHandler("/stages",
+      createServlet((request: HttpServletRequest) => indexPage.render(request),
+        sc.env.securityManager))
   )
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala b/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala
index dc18eab..cb2083e 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala
@@ -19,7 +19,7 @@ package org.apache.spark.ui.storage
 
 import javax.servlet.http.HttpServletRequest
 
-import org.eclipse.jetty.server.Handler
+import org.eclipse.jetty.servlet.ServletContextHandler
 
 import org.apache.spark.{Logging, SparkContext}
 import org.apache.spark.ui.JettyUtils._
@@ -29,8 +29,12 @@ private[spark] class BlockManagerUI(val sc: SparkContext) extends Logging {
   val indexPage = new IndexPage(this)
   val rddPage = new RDDPage(this)
 
-  def getHandlers = Seq[(String, Handler)](
-    ("/storage/rdd", (request: HttpServletRequest) => rddPage.render(request)),
-    ("/storage", (request: HttpServletRequest) => indexPage.render(request))
+  def getHandlers = Seq[ServletContextHandler](
+    createServletHandler("/storage/rdd",
+      createServlet((request: HttpServletRequest) => rddPage.render(request),
+      sc.env.securityManager)),
+    createServletHandler("/storage",
+      createServlet((request: HttpServletRequest) => indexPage.render(request),
+      sc.env.securityManager))
   )
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index f26ed47..a6c9a9a 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -24,12 +24,12 @@ import akka.actor.{ActorSystem, ExtendedActorSystem, IndestructibleActorSystem}
 import com.typesafe.config.ConfigFactory
 import org.apache.log4j.{Level, Logger}
 
-import org.apache.spark.SparkConf
+import org.apache.spark.{Logging, SecurityManager, SparkConf}
 
 /**
  * Various utility classes for working with Akka.
  */
-private[spark] object AkkaUtils {
+private[spark] object AkkaUtils extends Logging {
 
   /**
    * Creates an ActorSystem ready for remoting, with various Spark features. Returns both the
@@ -42,7 +42,7 @@ private[spark] object AkkaUtils {
    * of a fatal exception. This is used by [[org.apache.spark.executor.Executor]].
    */
   def createActorSystem(name: String, host: String, port: Int, indestructible: Boolean = false,
-    conf: SparkConf): (ActorSystem, Int) = {
+    conf: SparkConf, securityManager: SecurityManager): (ActorSystem, Int) = {
 
     val akkaThreads   = conf.getInt("spark.akka.threads", 4)
     val akkaBatchSize = conf.getInt("spark.akka.batchSize", 15)
@@ -65,6 +65,15 @@ private[spark] object AkkaUtils {
       conf.getDouble("spark.akka.failure-detector.threshold", 300.0)
     val akkaHeartBeatInterval = conf.getInt("spark.akka.heartbeat.interval", 1000)
 
+    val secretKey = securityManager.getSecretKey()
+    val isAuthOn = securityManager.isAuthenticationEnabled()
+    if (isAuthOn && secretKey == null) {
+      throw new Exception("Secret key is null with authentication on")
+    }
+    val requireCookie = if (isAuthOn) "on" else "off"
+    val secureCookie = if (isAuthOn) secretKey else ""
+    logDebug("In createActorSystem, requireCookie is: " + requireCookie)
+
     val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap[String, String]).withFallback(
       ConfigFactory.parseString(
       s"""
@@ -72,6 +81,8 @@ private[spark] object AkkaUtils {
       |akka.loggers = [""akka.event.slf4j.Slf4jLogger""]
       |akka.stdout-loglevel = "ERROR"
       |akka.jvm-exit-on-fatal-error = off
+      |akka.remote.require-cookie = "$requireCookie"
+      |akka.remote.secure-cookie = "$secureCookie"
       |akka.remote.transport-failure-detector.heartbeat-interval = $akkaHeartBeatInterval s
       |akka.remote.transport-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPauses s
       |akka.remote.transport-failure-detector.threshold = $akkaFailureDetector

http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 8e69f1d..0eb2f78 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.util
 
 import java.io._
-import java.net.{InetAddress, Inet4Address, NetworkInterface, URI, URL}
+import java.net.{InetAddress, Inet4Address, NetworkInterface, URI, URL, URLConnection}
 import java.nio.ByteBuffer
 import java.util.{Locale, Random, UUID}
 import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadPoolExecutor}
@@ -33,10 +33,11 @@ import com.google.common.io.Files
 import com.google.common.util.concurrent.ThreadFactoryBuilder
 import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
 
-import org.apache.spark.{Logging, SparkConf, SparkException}
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
 import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
 import org.apache.spark.deploy.SparkHadoopUtil
 
+
 /**
  * Various utility methods used by Spark.
  */
@@ -233,13 +234,29 @@ private[spark] object Utils extends Logging {
   }
 
   /**
+   * Construct a URI container information used for authentication.
+   * This also sets the default authenticator to properly negotiation the
+   * user/password based on the URI.
+   *
+   * Note this relies on the Authenticator.setDefault being set properly to decode
+   * the user name and password. This is currently set in the SecurityManager.
+   */
+  def constructURIForAuthentication(uri: URI, securityMgr: SecurityManager): URI = {
+    val userCred = securityMgr.getSecretKey()
+    if (userCred == null) throw new Exception("Secret key is null with authentication on")
+    val userInfo = securityMgr.getHttpUser()  + ":" + userCred
+    new URI(uri.getScheme(), userInfo, uri.getHost(), uri.getPort(), uri.getPath(), 
+      uri.getQuery(), uri.getFragment())
+  }
+
+  /**
    * Download a file requested by the executor. Supports fetching the file in a variety of ways,
    * including HTTP, HDFS and files on a standard filesystem, based on the URL parameter.
    *
    * Throws SparkException if the target file already exists and has different contents than
    * the requested file.
    */
-  def fetchFile(url: String, targetDir: File, conf: SparkConf) {
+  def fetchFile(url: String, targetDir: File, conf: SparkConf, securityMgr: SecurityManager) {
     val filename = url.split("/").last
     val tempDir = getLocalDir(conf)
     val tempFile =  File.createTempFile("fetchFileTemp", null, new File(tempDir))
@@ -249,7 +266,19 @@ private[spark] object Utils extends Logging {
     uri.getScheme match {
       case "http" | "https" | "ftp" =>
         logInfo("Fetching " + url + " to " + tempFile)
-        val in = new URL(url).openStream()
+
+        var uc: URLConnection = null
+        if (securityMgr.isAuthenticationEnabled()) {
+          logDebug("fetchFile with security enabled")
+          val newuri = constructURIForAuthentication(uri, securityMgr)
+          uc = newuri.toURL().openConnection()
+          uc.setAllowUserInteraction(false)
+        } else {
+          logDebug("fetchFile not using security")
+          uc = new URL(url).openConnection()
+        }
+
+        val in = uc.getInputStream();
         val out = new FileOutputStream(tempFile)
         Utils.copyStream(in, out, true)
         if (targetFile.exists && !Files.equal(tempFile, targetFile)) {

http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/core/src/test/scala/org/apache/spark/AkkaUtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/AkkaUtilsSuite.scala b/core/src/test/scala/org/apache/spark/AkkaUtilsSuite.scala
new file mode 100644
index 0000000..cd054c1
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/AkkaUtilsSuite.scala
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import org.scalatest.FunSuite
+
+import akka.actor._
+import org.apache.spark.scheduler.MapStatus
+import org.apache.spark.storage.BlockManagerId
+import org.apache.spark.util.AkkaUtils
+import scala.concurrent.Await
+
+/**
+  * Test the AkkaUtils with various security settings.
+  */
+class AkkaUtilsSuite extends FunSuite with LocalSparkContext {
+
+  test("remote fetch security bad password") {
+    val conf = new SparkConf
+    conf.set("spark.authenticate", "true")
+    conf.set("spark.authenticate.secret", "good")
+
+    val securityManager = new SecurityManager(conf);
+    val hostname = "localhost"
+    val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, 
+      conf = conf, securityManager = securityManager)
+    System.setProperty("spark.driver.port", boundPort.toString)    // Will be cleared by LocalSparkContext
+    System.setProperty("spark.hostPort", hostname + ":" + boundPort)
+    assert(securityManager.isAuthenticationEnabled() === true)
+
+    val masterTracker = new MapOutputTrackerMaster(conf)
+    masterTracker.trackerActor = actorSystem.actorOf(
+        Props(new MapOutputTrackerMasterActor(masterTracker)), "MapOutputTracker")
+
+    val badconf = new SparkConf
+    badconf.set("spark.authenticate", "true")
+    badconf.set("spark.authenticate.secret", "bad")
+    val securityManagerBad = new SecurityManager(badconf);
+
+    assert(securityManagerBad.isAuthenticationEnabled() === true)
+
+    val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0, 
+      conf = conf, securityManager = securityManagerBad)
+    val slaveTracker = new MapOutputTracker(conf)
+    val selection = slaveSystem.actorSelection(
+      s"akka.tcp://spark@localhost:$boundPort/user/MapOutputTracker")
+    val timeout = AkkaUtils.lookupTimeout(conf)
+    intercept[akka.actor.ActorNotFound] { 
+      slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout) 
+    }
+
+    actorSystem.shutdown()
+    slaveSystem.shutdown()
+  }
+
+  test("remote fetch security off") {
+    val conf = new SparkConf
+    conf.set("spark.authenticate", "false")
+    conf.set("spark.authenticate.secret", "bad")
+    val securityManager = new SecurityManager(conf);
+
+    val hostname = "localhost"
+    val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, 
+      conf = conf, securityManager = securityManager)
+    System.setProperty("spark.driver.port", boundPort.toString)    // Will be cleared by LocalSparkContext
+    System.setProperty("spark.hostPort", hostname + ":" + boundPort)
+
+    assert(securityManager.isAuthenticationEnabled() === false)
+
+    val masterTracker = new MapOutputTrackerMaster(conf)
+    masterTracker.trackerActor = actorSystem.actorOf(
+        Props(new MapOutputTrackerMasterActor(masterTracker)), "MapOutputTracker")
+
+    val badconf = new SparkConf
+    badconf.set("spark.authenticate", "false")
+    badconf.set("spark.authenticate.secret", "good")
+    val securityManagerBad = new SecurityManager(badconf);
+
+    val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0, 
+      conf = badconf, securityManager = securityManagerBad)
+    val slaveTracker = new MapOutputTracker(conf)
+    val selection = slaveSystem.actorSelection(
+      s"akka.tcp://spark@localhost:$boundPort/user/MapOutputTracker")
+    val timeout = AkkaUtils.lookupTimeout(conf)
+    slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout)
+
+    assert(securityManagerBad.isAuthenticationEnabled() === false)
+
+    masterTracker.registerShuffle(10, 1)
+    masterTracker.incrementEpoch()
+    slaveTracker.updateEpoch(masterTracker.getEpoch)
+
+    val compressedSize1000 = MapOutputTracker.compressSize(1000L)
+    val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
+    masterTracker.registerMapOutput(10, 0, new MapStatus(
+      BlockManagerId("a", "hostA", 1000, 0), Array(compressedSize1000)))
+    masterTracker.incrementEpoch()
+    slaveTracker.updateEpoch(masterTracker.getEpoch)
+
+    // this should succeed since security off
+    assert(slaveTracker.getServerStatuses(10, 0).toSeq ===
+           Seq((BlockManagerId("a", "hostA", 1000, 0), size1000)))
+
+    actorSystem.shutdown()
+    slaveSystem.shutdown()
+  }
+
+  test("remote fetch security pass") {
+    val conf = new SparkConf
+    conf.set("spark.authenticate", "true")
+    conf.set("spark.authenticate.secret", "good")
+    val securityManager = new SecurityManager(conf);
+
+    val hostname = "localhost"
+    val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, 
+      conf = conf, securityManager = securityManager)
+    System.setProperty("spark.driver.port", boundPort.toString)    // Will be cleared by LocalSparkContext
+    System.setProperty("spark.hostPort", hostname + ":" + boundPort)
+
+    assert(securityManager.isAuthenticationEnabled() === true)
+
+    val masterTracker = new MapOutputTrackerMaster(conf)
+    masterTracker.trackerActor = actorSystem.actorOf(
+        Props(new MapOutputTrackerMasterActor(masterTracker)), "MapOutputTracker")
+
+    val goodconf = new SparkConf
+    goodconf.set("spark.authenticate", "true")
+    goodconf.set("spark.authenticate.secret", "good")
+    val securityManagerGood = new SecurityManager(goodconf);
+
+    assert(securityManagerGood.isAuthenticationEnabled() === true)
+
+    val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0,
+      conf = goodconf, securityManager = securityManagerGood)
+    val slaveTracker = new MapOutputTracker(conf)
+    val selection = slaveSystem.actorSelection(
+      s"akka.tcp://spark@localhost:$boundPort/user/MapOutputTracker")
+    val timeout = AkkaUtils.lookupTimeout(conf)
+    slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout)
+
+    masterTracker.registerShuffle(10, 1)
+    masterTracker.incrementEpoch()
+    slaveTracker.updateEpoch(masterTracker.getEpoch)
+
+    val compressedSize1000 = MapOutputTracker.compressSize(1000L)
+    val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
+    masterTracker.registerMapOutput(10, 0, new MapStatus(
+      BlockManagerId("a", "hostA", 1000, 0), Array(compressedSize1000)))
+    masterTracker.incrementEpoch()
+    slaveTracker.updateEpoch(masterTracker.getEpoch)
+
+    // this should succeed since security on and passwords match
+    assert(slaveTracker.getServerStatuses(10, 0).toSeq ===
+           Seq((BlockManagerId("a", "hostA", 1000, 0), size1000)))
+
+    actorSystem.shutdown()
+    slaveSystem.shutdown()
+  }
+
+  test("remote fetch security off client") {
+    val conf = new SparkConf
+    conf.set("spark.authenticate", "true")
+    conf.set("spark.authenticate.secret", "good")
+
+    val securityManager = new SecurityManager(conf);
+
+    val hostname = "localhost"
+    val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, 
+      conf = conf, securityManager = securityManager)
+    System.setProperty("spark.driver.port", boundPort.toString)    // Will be cleared by LocalSparkContext
+    System.setProperty("spark.hostPort", hostname + ":" + boundPort)
+
+    assert(securityManager.isAuthenticationEnabled() === true)
+
+    val masterTracker = new MapOutputTrackerMaster(conf)
+    masterTracker.trackerActor = actorSystem.actorOf(
+        Props(new MapOutputTrackerMasterActor(masterTracker)), "MapOutputTracker")
+
+    val badconf = new SparkConf
+    badconf.set("spark.authenticate", "false")
+    badconf.set("spark.authenticate.secret", "bad")
+    val securityManagerBad = new SecurityManager(badconf);
+
+    assert(securityManagerBad.isAuthenticationEnabled() === false)
+
+    val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0,
+      conf = badconf, securityManager = securityManagerBad)
+    val slaveTracker = new MapOutputTracker(conf)
+    val selection = slaveSystem.actorSelection(
+      s"akka.tcp://spark@localhost:$boundPort/user/MapOutputTracker")
+    val timeout = AkkaUtils.lookupTimeout(conf)
+    intercept[akka.actor.ActorNotFound] { 
+      slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout) 
+    }
+
+    actorSystem.shutdown()
+    slaveSystem.shutdown()
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/core/src/test/scala/org/apache/spark/BroadcastSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/BroadcastSuite.scala b/core/src/test/scala/org/apache/spark/BroadcastSuite.scala
index e022acc..96ba392 100644
--- a/core/src/test/scala/org/apache/spark/BroadcastSuite.scala
+++ b/core/src/test/scala/org/apache/spark/BroadcastSuite.scala
@@ -21,6 +21,7 @@ import org.scalatest.FunSuite
 
 class BroadcastSuite extends FunSuite with LocalSparkContext {
 
+
   override def afterEach() {
     super.afterEach()
     System.clearProperty("spark.broadcast.factory")

http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/core/src/test/scala/org/apache/spark/ConnectionManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ConnectionManagerSuite.scala b/core/src/test/scala/org/apache/spark/ConnectionManagerSuite.scala
new file mode 100644
index 0000000..80f7ec0
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/ConnectionManagerSuite.scala
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import org.scalatest.FunSuite
+
+import java.nio._
+
+import org.apache.spark.network.{ConnectionManager, Message, ConnectionManagerId}
+import scala.concurrent.Await
+import scala.concurrent.TimeoutException
+import scala.concurrent.duration._
+
+
+/**
+  * Test the ConnectionManager with various security settings.
+  */
+class ConnectionManagerSuite extends FunSuite {
+
+  test("security default off") {
+    val conf = new SparkConf
+    val securityManager = new SecurityManager(conf)
+    val manager = new ConnectionManager(0, conf, securityManager)
+    var receivedMessage = false
+    manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
+      receivedMessage = true
+      None
+    })
+
+    val size = 10 * 1024 * 1024
+    val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => x.toByte))
+    buffer.flip
+
+    val bufferMessage = Message.createBufferMessage(buffer.duplicate)
+    manager.sendMessageReliablySync(manager.id, bufferMessage)
+
+    assert(receivedMessage == true)
+
+    manager.stop()
+  }
+
+  test("security on same password") {
+    val conf = new SparkConf
+    conf.set("spark.authenticate", "true")
+    conf.set("spark.authenticate.secret", "good")
+    val securityManager = new SecurityManager(conf)
+    val manager = new ConnectionManager(0, conf, securityManager)
+    var numReceivedMessages = 0
+
+    manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
+      numReceivedMessages += 1
+      None
+    })
+    val managerServer = new ConnectionManager(0, conf, securityManager)
+    var numReceivedServerMessages = 0
+    managerServer.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
+      numReceivedServerMessages += 1
+      None
+    })
+
+    val size = 10 * 1024 * 1024
+    val count = 10
+    val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => x.toByte))
+    buffer.flip
+
+    (0 until count).map(i => {
+      val bufferMessage = Message.createBufferMessage(buffer.duplicate)
+      manager.sendMessageReliablySync(managerServer.id, bufferMessage)
+    })
+
+    assert(numReceivedServerMessages == 10)
+    assert(numReceivedMessages == 0)
+
+    manager.stop()
+    managerServer.stop()
+  }
+
+  test("security mismatch password") {
+    val conf = new SparkConf
+    conf.set("spark.authenticate", "true")
+    conf.set("spark.authenticate.secret", "good")
+    val securityManager = new SecurityManager(conf)
+    val manager = new ConnectionManager(0, conf, securityManager)
+    var numReceivedMessages = 0
+
+    manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
+      numReceivedMessages += 1
+      None
+    })
+
+    val badconf = new SparkConf
+    badconf.set("spark.authenticate", "true")
+    badconf.set("spark.authenticate.secret", "bad")
+    val badsecurityManager = new SecurityManager(badconf)
+    val managerServer = new ConnectionManager(0, badconf, badsecurityManager)
+    var numReceivedServerMessages = 0
+
+    managerServer.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
+      numReceivedServerMessages += 1
+      None
+    })
+
+    val size = 10 * 1024 * 1024
+    val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => x.toByte))
+    buffer.flip
+    val bufferMessage = Message.createBufferMessage(buffer.duplicate)
+    manager.sendMessageReliablySync(managerServer.id, bufferMessage)
+
+    assert(numReceivedServerMessages == 0)
+    assert(numReceivedMessages == 0)
+
+    manager.stop()
+    managerServer.stop()
+  }
+
+  test("security mismatch auth off") {
+    val conf = new SparkConf
+    conf.set("spark.authenticate", "false")
+    conf.set("spark.authenticate.secret", "good")
+    val securityManager = new SecurityManager(conf)
+    val manager = new ConnectionManager(0, conf, securityManager)
+    var numReceivedMessages = 0
+
+    manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
+      numReceivedMessages += 1
+      None
+    })
+
+    val badconf = new SparkConf
+    badconf.set("spark.authenticate", "true")
+    badconf.set("spark.authenticate.secret", "good")
+    val badsecurityManager = new SecurityManager(badconf)
+    val managerServer = new ConnectionManager(0, badconf, badsecurityManager)
+    var numReceivedServerMessages = 0
+    managerServer.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
+      numReceivedServerMessages += 1
+      None
+    })
+
+    val size = 10 * 1024 * 1024
+    val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => x.toByte))
+    buffer.flip
+    val bufferMessage = Message.createBufferMessage(buffer.duplicate)
+    (0 until 1).map(i => {
+      val bufferMessage = Message.createBufferMessage(buffer.duplicate)
+      manager.sendMessageReliably(managerServer.id, bufferMessage)
+    }).foreach(f => {
+      try {
+        val g = Await.result(f, 1 second)
+        assert(false)
+      } catch {
+        case e: TimeoutException => {
+          // we should timeout here since the client can't do the negotiation
+          assert(true)
+        }
+      }
+    })
+
+    assert(numReceivedServerMessages == 0)
+    assert(numReceivedMessages == 0)
+    manager.stop()
+    managerServer.stop()
+  }
+
+  test("security auth off") {
+    val conf = new SparkConf
+    conf.set("spark.authenticate", "false")
+    val securityManager = new SecurityManager(conf)
+    val manager = new ConnectionManager(0, conf, securityManager)
+    var numReceivedMessages = 0
+
+    manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
+      numReceivedMessages += 1
+      None
+    })
+
+    val badconf = new SparkConf
+    badconf.set("spark.authenticate", "false")
+    val badsecurityManager = new SecurityManager(badconf)
+    val managerServer = new ConnectionManager(0, badconf, badsecurityManager)
+    var numReceivedServerMessages = 0
+
+    managerServer.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
+      numReceivedServerMessages += 1
+      None
+    })
+
+    val size = 10 * 1024 * 1024
+    val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => x.toByte))
+    buffer.flip
+    val bufferMessage = Message.createBufferMessage(buffer.duplicate)
+    (0 until 10).map(i => {
+      val bufferMessage = Message.createBufferMessage(buffer.duplicate)
+      manager.sendMessageReliably(managerServer.id, bufferMessage)
+    }).foreach(f => {
+      try {
+        val g = Await.result(f, 1 second)
+        if (!g.isDefined) assert(false) else assert(true)
+      } catch {
+        case e: Exception => {
+          assert(false)
+        }
+      }
+    })
+    assert(numReceivedServerMessages == 10)
+    assert(numReceivedMessages == 0)
+
+    manager.stop()
+    managerServer.stop()
+  }
+
+
+
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/core/src/test/scala/org/apache/spark/DriverSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/DriverSuite.scala b/core/src/test/scala/org/apache/spark/DriverSuite.scala
index e0e8011..9cbdfc5 100644
--- a/core/src/test/scala/org/apache/spark/DriverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DriverSuite.scala
@@ -30,6 +30,7 @@ import org.scalatest.time.SpanSugar._
 import org.apache.spark.util.Utils
 
 class DriverSuite extends FunSuite with Timeouts {
+
   test("driver should exit after finishing") {
     val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home")).get
     // Regression test for SPARK-530: "Spark driver process doesn't exit after finishing"

http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/core/src/test/scala/org/apache/spark/FileServerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/FileServerSuite.scala b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
index 9be67b3..aee9ab9 100644
--- a/core/src/test/scala/org/apache/spark/FileServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
@@ -30,6 +30,12 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
   @transient var tmpFile: File = _
   @transient var tmpJarUrl: String = _
 
+  override def beforeEach() {
+    super.beforeEach()
+    resetSparkContext()
+    System.setProperty("spark.authenticate", "false")
+  }
+
   override def beforeAll() {
     super.beforeAll()
     val tmpDir = new File(Files.createTempDir(), "test")
@@ -43,6 +49,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
     val jarFile = new File(tmpDir, "test.jar")
     val jarStream = new FileOutputStream(jarFile)
     val jar = new JarOutputStream(jarStream, new java.util.jar.Manifest())
+    System.setProperty("spark.authenticate", "false")
 
     val jarEntry = new JarEntry(textFile.getName)
     jar.putNextEntry(jarEntry)
@@ -77,6 +84,25 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
     assert(result.toSet === Set((1,200), (2,300), (3,500)))
   }
 
+  test("Distributing files locally security On") {
+    val sparkConf = new SparkConf(false)
+    sparkConf.set("spark.authenticate", "true")
+    sparkConf.set("spark.authenticate.secret", "good")
+    sc = new SparkContext("local[4]", "test", sparkConf)
+
+    sc.addFile(tmpFile.toString)
+    assert(sc.env.securityManager.isAuthenticationEnabled() === true)
+    val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0))
+    val result = sc.parallelize(testData).reduceByKey {
+      val path = SparkFiles.get("FileServerSuite.txt")
+      val in = new BufferedReader(new FileReader(path))
+      val fileVal = in.readLine().toInt
+      in.close()
+      _ * fileVal + _ * fileVal
+    }.collect()
+    assert(result.toSet === Set((1,200), (2,300), (3,500)))
+  }
+
   test("Distributing files locally using URL as input") {
     // addFile("file:///....")
     sc = new SparkContext("local[4]", "test")

http://git-wip-us.apache.org/repos/asf/spark/blob/7edbea41/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index 6c1e325..8efa072 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -98,14 +98,16 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
 
   test("remote fetch") {
     val hostname = "localhost"
-    val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, conf = conf)
+    val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, conf = conf,
+      securityManager = new SecurityManager(conf))
     System.setProperty("spark.driver.port", boundPort.toString)    // Will be cleared by LocalSparkContext
 
     val masterTracker = new MapOutputTrackerMaster(conf)
     masterTracker.trackerActor = actorSystem.actorOf(
         Props(new MapOutputTrackerMasterActor(masterTracker)), "MapOutputTracker")
 
-    val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0, conf = conf)
+    val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0, conf = conf,
+      securityManager = new SecurityManager(conf))
     val slaveTracker = new MapOutputTracker(conf)
     val selection = slaveSystem.actorSelection(
       s"akka.tcp://spark@localhost:$boundPort/user/MapOutputTracker")