You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/05/29 04:17:02 UTC

[spark] branch branch-3.0 updated: [SPARK-31833][SQL][TEST-HIVE1.2] Set HiveThriftServer2 with actual port while configured 0

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new d2fb6e4  [SPARK-31833][SQL][TEST-HIVE1.2] Set HiveThriftServer2 with actual port while configured 0
d2fb6e4 is described below

commit d2fb6e408538e89ac5cc3b3c71925dbaca387bbc
Author: Kent Yao <ya...@hotmail.com>
AuthorDate: Fri May 29 04:13:23 2020 +0000

    [SPARK-31833][SQL][TEST-HIVE1.2] Set HiveThriftServer2 with actual port while configured 0
    
    ### What changes were proposed in this pull request?
    
    When I was developing some stuff based on the `DeveloperAPI ` `org.apache.spark.sql.hive.thriftserver.HiveThriftServer2#startWithContext`, I need to use thrift port randomly to avoid race on ports. But the `org.apache.hive.service.cli.thrift.ThriftCLIService#getPortNumber` do not respond to me with the actual bound port but always 0.
    
    And the server log is not right too, after starting the server, it's hard to form to the right JDBC connection string.
    
    ```
    INFO ThriftCLIService: Starting ThriftBinaryCLIService on port 0 with 5...500 worker threads
    ```
    
    Indeed, the `53742` is the right port
    ```shell
    lsof -nP -p `cat ./pid/spark-kentyao-org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-1.pid` | grep LISTEN
    java    18990 kentyao  288u    IPv6 0x22858e3e60d6a0a7      0t0                 TCP 10.242.189.214:53723 (LISTEN)
    java    18990 kentyao  290u    IPv6 0x22858e3e60d68827      0t0                 TCP *:4040 (LISTEN)
    java    18990 kentyao  366u    IPv6 0x22858e3e60d66987      0t0                 TCP 10.242.189.214:53724 (LISTEN)
    java    18990 kentyao  438u    IPv6 0x22858e3e60d65d47      0t0                 TCP *:53742 (LISTEN)
    ```
    
    In the PR, when the port is configured 0, the `portNum` will be set to the real used port during the start process.
    
    Also use 0 in thrift related tests to avoid potential flakiness.
    
    ### Why are the changes needed?
    
    1 fix API bug
    2 reduce test flakiness
    
    ### Does this PR introduce _any_ user-facing change?
    
    yes, `org.apache.hive.service.cli.thrift.ThriftCLIService#getPortNumber` will always give you the actual port when it is configured 0.
    
    ### How was this patch tested?
    
    modified unit tests
    
    Closes #28651 from yaooqinn/SPARK-31833.
    
    Authored-by: Kent Yao <ya...@hotmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit fe1da296da152528fc159c23e3026a0c19343780)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../sql/hive/thriftserver/SharedThriftServer.scala | 33 ++++++++++++----------
 .../service/cli/thrift/ThriftBinaryCLIService.java |  4 +++
 .../service/cli/thrift/ThriftHttpCLIService.java   |  3 ++
 .../service/cli/thrift/ThriftBinaryCLIService.java |  4 +++
 .../service/cli/thrift/ThriftHttpCLIService.java   |  3 ++
 5 files changed, 32 insertions(+), 15 deletions(-)

diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SharedThriftServer.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SharedThriftServer.scala
index ce61009..e002bc0 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SharedThriftServer.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SharedThriftServer.scala
@@ -19,29 +19,25 @@ package org.apache.spark.sql.hive.thriftserver
 
 import java.sql.{DriverManager, Statement}
 
+import scala.collection.JavaConverters._
 import scala.concurrent.duration._
-import scala.util.{Random, Try}
+import scala.util.Try
 
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars
+import org.apache.hive.service.cli.thrift.ThriftCLIService
 
-import org.apache.spark.sql.QueryTest
 import org.apache.spark.sql.test.SharedSparkSession
 
 trait SharedThriftServer extends SharedSparkSession {
 
   private var hiveServer2: HiveThriftServer2 = _
+  private var serverPort: Int = 0
 
   override def beforeAll(): Unit = {
     super.beforeAll()
-    // Chooses a random port between 10000 and 19999
-    var listeningPort = 10000 + Random.nextInt(10000)
-
     // Retries up to 3 times with different port numbers if the server fails to start
-    (1 to 3).foldLeft(Try(startThriftServer(listeningPort, 0))) { case (started, attempt) =>
-      started.orElse {
-        listeningPort += 1
-        Try(startThriftServer(listeningPort, attempt))
-      }
+    (1 to 3).foldLeft(Try(startThriftServer(0))) { case (started, attempt) =>
+      started.orElse(Try(startThriftServer(attempt)))
     }.recover {
       case cause: Throwable =>
         throw cause
@@ -59,8 +55,7 @@ trait SharedThriftServer extends SharedSparkSession {
 
   protected def withJdbcStatement(fs: (Statement => Unit)*): Unit = {
     val user = System.getProperty("user.name")
-
-    val serverPort = hiveServer2.getHiveConf.get(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname)
+    require(serverPort != 0, "Failed to bind an actual port for HiveThriftServer2")
     val connections =
       fs.map { _ => DriverManager.getConnection(s"jdbc:hive2://localhost:$serverPort", user, "") }
     val statements = connections.map(_.createStatement())
@@ -73,11 +68,19 @@ trait SharedThriftServer extends SharedSparkSession {
     }
   }
 
-  private def startThriftServer(port: Int, attempt: Int): Unit = {
-    logInfo(s"Trying to start HiveThriftServer2: port=$port, attempt=$attempt")
+  private def startThriftServer(attempt: Int): Unit = {
+    logInfo(s"Trying to start HiveThriftServer2:, attempt=$attempt")
     val sqlContext = spark.newSession().sqlContext
-    sqlContext.setConf(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, port.toString)
+    // Set the HIVE_SERVER2_THRIFT_PORT to 0, so it could randomly pick any free port to use.
+    // It's much more robust than set a random port generated by ourselves ahead
+    sqlContext.setConf(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, "0")
     hiveServer2 = HiveThriftServer2.startWithContext(sqlContext)
+    hiveServer2.getServices.asScala.foreach {
+      case t: ThriftCLIService if t.getPortNumber != 0 =>
+        serverPort = t.getPortNumber
+        logInfo(s"Started HiveThriftServer2: port=$serverPort, attempt=$attempt")
+      case _ =>
+    }
 
     // Wait for thrift server to be ready to serve the query, via executing simple query
     // till the query succeeds. See SPARK-30345 for more details.
diff --git a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
index 21b8bf7..e1ee503 100644
--- a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
+++ b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
@@ -76,6 +76,10 @@ public class ThriftBinaryCLIService extends ThriftCLIService {
             keyStorePassword, sslVersionBlacklist);
       }
 
+      // In case HIVE_SERVER2_THRIFT_PORT or hive.server2.thrift.port is configured with 0 which
+      // represents any free port, we should set it to the actual one
+      portNum = serverSocket.getServerSocket().getLocalPort();
+
       // Server args
       int maxMessageSize = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_MAX_MESSAGE_SIZE);
       int requestTimeout = (int) hiveConf.getTimeVar(
diff --git a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java
index 504e63d..1099a00 100644
--- a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java
+++ b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java
@@ -143,6 +143,9 @@ public class ThriftHttpCLIService extends ThriftCLIService {
       // TODO: check defaults: maxTimeout, keepalive, maxBodySize, bodyRecieveDuration, etc.
       // Finally, start the server
       httpServer.start();
+      // In case HIVE_SERVER2_THRIFT_HTTP_PORT or hive.server2.thrift.http.port is configured with
+      // 0 which represents any free port, we should set it to the actual one
+      portNum = connector.getLocalPort();
       String msg = "Started " + ThriftHttpCLIService.class.getSimpleName() + " in " + schemeName
           + " mode on port " + connector.getLocalPort()+ " path=" + httpPath + " with " + minWorkerThreads + "..."
           + maxWorkerThreads + " worker threads";
diff --git a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
index fc19c65..a7de9c0 100644
--- a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
+++ b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
@@ -77,6 +77,10 @@ public class ThriftBinaryCLIService extends ThriftCLIService {
             keyStorePassword, sslVersionBlacklist);
       }
 
+      // In case HIVE_SERVER2_THRIFT_PORT or hive.server2.thrift.port is configured with 0 which
+      // represents any free port, we should set it to the actual one
+      portNum = serverSocket.getServerSocket().getLocalPort();
+
       // Server args
       int maxMessageSize = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_MAX_MESSAGE_SIZE);
       int requestTimeout = (int) hiveConf.getTimeVar(
diff --git a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java
index 08626e7..73d5f84 100644
--- a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java
+++ b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java
@@ -144,6 +144,9 @@ public class ThriftHttpCLIService extends ThriftCLIService {
       // TODO: check defaults: maxTimeout, keepalive, maxBodySize, bodyRecieveDuration, etc.
       // Finally, start the server
       httpServer.start();
+      // In case HIVE_SERVER2_THRIFT_HTTP_PORT or hive.server2.thrift.http.port is configured with
+      // 0 which represents any free port, we should set it to the actual one
+      portNum = connector.getLocalPort();
       String msg = "Started " + ThriftHttpCLIService.class.getSimpleName() + " in " + schemeName
           + " mode on port " + portNum + " path=" + httpPath + " with " + minWorkerThreads + "..."
           + maxWorkerThreads + " worker threads";


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org