You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/04/20 19:48:15 UTC

spark git commit: [SPARK-14725][CORE] Remove HttpServer class

Repository: spark
Updated Branches:
  refs/heads/master b4e76a9a3 -> 90cbc82fd


[SPARK-14725][CORE] Remove HttpServer class

## What changes were proposed in this pull request?

This proposal removes the class `HttpServer`, with the changing of internal file/jar/class transmission to RPC layer, currently there's no code using this `HttpServer`, so here propose to remove it.

## How was this patch tested?

Unit test is verified locally.

Author: jerryshao <ss...@hortonworks.com>

Closes #12526 from jerryshao/SPARK-14725.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/90cbc82f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/90cbc82f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/90cbc82f

Branch: refs/heads/master
Commit: 90cbc82fd4114219a5a0f180b1908a18985fda3e
Parents: b4e76a9
Author: jerryshao <ss...@hortonworks.com>
Authored: Wed Apr 20 10:48:11 2016 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Wed Apr 20 10:48:11 2016 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/HttpServer.scala     | 181 -------------------
 .../spark/repl/ExecutorClassLoaderSuite.scala   |  53 ------
 2 files changed, 234 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/90cbc82f/core/src/main/scala/org/apache/spark/HttpServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala b/core/src/main/scala/org/apache/spark/HttpServer.scala
deleted file mode 100644
index 982b6d6..0000000
--- a/core/src/main/scala/org/apache/spark/HttpServer.scala
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import java.io.File
-
-import org.eclipse.jetty.security.{ConstraintMapping, ConstraintSecurityHandler, HashLoginService}
-import org.eclipse.jetty.security.authentication.DigestAuthenticator
-import org.eclipse.jetty.server.Server
-import org.eclipse.jetty.server.bio.SocketConnector
-import org.eclipse.jetty.server.ssl.SslSocketConnector
-import org.eclipse.jetty.servlet.{DefaultServlet, ServletContextHandler, ServletHolder}
-import org.eclipse.jetty.util.component.LifeCycle
-import org.eclipse.jetty.util.security.{Constraint, Password}
-import org.eclipse.jetty.util.thread.QueuedThreadPool
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.util.Utils
-
-/**
- * Exception type thrown by HttpServer when it is in the wrong state for an operation.
- */
-private[spark] class ServerStateException(message: String) extends Exception(message)
-
-/**
- * An HTTP server for static content used to allow worker nodes to access JARs added to SparkContext
- * as well as classes created by the interpreter when the user types in code. This is just a wrapper
- * around a Jetty server.
- */
-private[spark] class HttpServer(
-    conf: SparkConf,
-    resourceBase: File,
-    securityManager: SecurityManager,
-    requestedPort: Int = 0,
-    serverName: String = "HTTP server")
-  extends Logging {
-
-  private var server: Server = null
-  private var port: Int = requestedPort
-  private val servlets = {
-    val handler = new ServletContextHandler()
-    handler.setContextPath("/")
-    handler
-  }
-
-  def start() {
-    if (server != null) {
-      throw new ServerStateException("Server is already started")
-    } else {
-      logInfo("Starting HTTP Server")
-      val (actualServer, actualPort) =
-        Utils.startServiceOnPort[Server](requestedPort, doStart, conf, serverName)
-      server = actualServer
-      port = actualPort
-    }
-  }
-
-  def addDirectory(contextPath: String, resourceBase: String): Unit = {
-    val holder = new ServletHolder()
-    holder.setInitParameter("resourceBase", resourceBase)
-    holder.setInitParameter("pathInfoOnly", "true")
-    holder.setServlet(new DefaultServlet())
-    servlets.addServlet(holder, contextPath.stripSuffix("/") + "/*")
-  }
-
-  /**
-   * Actually start the HTTP server on the given port.
-   *
-   * Note that this is only best effort in the sense that we may end up binding to a nearby port
-   * in the event of port collision. Return the bound server and the actual port used.
-   */
-  private def doStart(startPort: Int): (Server, Int) = {
-    val server = new Server()
-
-    val connector = securityManager.fileServerSSLOptions.createJettySslContextFactory()
-      .map(new SslSocketConnector(_)).getOrElse(new SocketConnector)
-
-    connector.setMaxIdleTime(60 * 1000)
-    connector.setSoLingerTime(-1)
-    connector.setPort(startPort)
-    server.addConnector(connector)
-
-    val threadPool = new QueuedThreadPool
-    threadPool.setDaemon(true)
-    server.setThreadPool(threadPool)
-    addDirectory("/", resourceBase.getAbsolutePath)
-
-    if (securityManager.isAuthenticationEnabled()) {
-      logDebug("HttpServer is using security")
-      val sh = setupSecurityHandler(securityManager)
-      // make sure we go through security handler to get resources
-      sh.setHandler(servlets)
-      server.setHandler(sh)
-    } else {
-      logDebug("HttpServer is not using security")
-      server.setHandler(servlets)
-    }
-
-    server.start()
-    val actualPort = server.getConnectors()(0).getLocalPort
-
-    (server, actualPort)
-  }
-
-  /**
-   * Setup Jetty to the HashLoginService using a single user with our
-   * shared secret. Configure it to use DIGEST-MD5 authentication so that the password
-   * isn't passed in plaintext.
-   */
-  private def setupSecurityHandler(securityMgr: SecurityManager): ConstraintSecurityHandler = {
-    val constraint = new Constraint()
-    // use DIGEST-MD5 as the authentication mechanism
-    constraint.setName(Constraint.__DIGEST_AUTH)
-    constraint.setRoles(Array("user"))
-    constraint.setAuthenticate(true)
-    constraint.setDataConstraint(Constraint.DC_NONE)
-
-    val cm = new ConstraintMapping()
-    cm.setConstraint(constraint)
-    cm.setPathSpec("/*")
-    val sh = new ConstraintSecurityHandler()
-
-    // the hashLoginService lets us do a single user and
-    // secret right now. This could be changed to use the
-    // JAASLoginService for other options.
-    val hashLogin = new HashLoginService()
-
-    val userCred = new Password(securityMgr.getSecretKey())
-    if (userCred == null) {
-      throw new Exception("Error: secret key is null with authentication on")
-    }
-    hashLogin.putUser(securityMgr.getHttpUser(), userCred, Array("user"))
-    sh.setLoginService(hashLogin)
-    sh.setAuthenticator(new DigestAuthenticator());
-    sh.setConstraintMappings(Array(cm))
-    sh
-  }
-
-  def stop() {
-    if (server == null) {
-      throw new ServerStateException("Server is already stopped")
-    } else {
-      server.stop()
-      // Stop the ThreadPool if it supports stop() method (through LifeCycle).
-      // It is needed because stopping the Server won't stop the ThreadPool it uses.
-      val threadPool = server.getThreadPool
-      if (threadPool != null && threadPool.isInstanceOf[LifeCycle]) {
-        threadPool.asInstanceOf[LifeCycle].stop
-      }
-      port = -1
-      server = null
-    }
-  }
-
-  /**
-   * Get the URI of this HTTP server (http://host:port or https://host:port)
-   */
-  def uri: String = {
-    if (server == null) {
-      throw new ServerStateException("Server is not started")
-    } else {
-      val scheme = if (securityManager.fileServerSSLOptions.enabled) "https" else "http"
-      s"$scheme://${Utils.localHostNameForURI()}:$port"
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/90cbc82f/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala b/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala
index 9a143ee..12e9856 100644
--- a/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala
+++ b/repl/src/test/scala/org/apache/spark/repl/ExecutorClassLoaderSuite.scala
@@ -57,7 +57,6 @@ class ExecutorClassLoaderSuite
   var tempDir2: File = _
   var url1: String = _
   var urls2: Array[URL] = _
-  var classServer: HttpServer = _
 
   override def beforeAll() {
     super.beforeAll()
@@ -74,9 +73,6 @@ class ExecutorClassLoaderSuite
 
   override def afterAll() {
     try {
-      if (classServer != null) {
-        classServer.stop()
-      }
       Utils.deleteRecursively(tempDir1)
       Utils.deleteRecursively(tempDir2)
       SparkEnv.set(null)
@@ -137,55 +133,6 @@ class ExecutorClassLoaderSuite
     assert(fileReader.readLine().contains("resource"), "File doesn't contain 'resource'")
   }
 
-  test("failing to fetch classes from HTTP server should not leak resources (SPARK-6209)") {
-    // This is a regression test for SPARK-6209, a bug where each failed attempt to load a class
-    // from the driver's class server would leak a HTTP connection, causing the class server's
-    // thread / connection pool to be exhausted.
-    val conf = new SparkConf()
-    val securityManager = new SecurityManager(conf)
-    classServer = new HttpServer(conf, tempDir1, securityManager)
-    classServer.start()
-    // ExecutorClassLoader uses SparkEnv's SecurityManager, so we need to mock this
-    val mockEnv = mock[SparkEnv]
-    when(mockEnv.securityManager).thenReturn(securityManager)
-    SparkEnv.set(mockEnv)
-    // Create an ExecutorClassLoader that's configured to load classes from the HTTP server
-    val parentLoader = new URLClassLoader(Array.empty, null)
-    val classLoader = new ExecutorClassLoader(conf, null, classServer.uri, parentLoader, false)
-    classLoader.httpUrlConnectionTimeoutMillis = 500
-    // Check that this class loader can actually load classes that exist
-    val fakeClass = classLoader.loadClass("ReplFakeClass2").newInstance()
-    val fakeClassVersion = fakeClass.toString
-    assert(fakeClassVersion === "1")
-    // Try to perform a full GC now, since GC during the test might mask resource leaks
-    System.gc()
-    // When the original bug occurs, the test thread becomes blocked in a classloading call
-    // and does not respond to interrupts.  Therefore, use a custom ScalaTest interruptor to
-    // shut down the HTTP server when the test times out
-    val interruptor: Interruptor = new Interruptor {
-      override def apply(thread: Thread): Unit = {
-        classServer.stop()
-        classServer = null
-        thread.interrupt()
-      }
-    }
-    def tryAndFailToLoadABunchOfClasses(): Unit = {
-      // The number of trials here should be much larger than Jetty's thread / connection limit
-      // in order to expose thread or connection leaks
-      for (i <- 1 to 1000) {
-        if (Thread.currentThread().isInterrupted) {
-          throw new InterruptedException()
-        }
-        // Incorporate the iteration number into the class name in order to avoid any response
-        // caching that might be added in the future
-        intercept[ClassNotFoundException] {
-          classLoader.loadClass(s"ReplFakeClassDoesNotExist$i").newInstance()
-        }
-      }
-    }
-    failAfter(10 seconds)(tryAndFailToLoadABunchOfClasses())(interruptor)
-  }
-
   test("fetch classes using Spark's RpcEnv") {
     val env = mock[SparkEnv]
     val rpcEnv = mock[RpcEnv]


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org