You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ya...@apache.org on 2020/05/07 05:38:26 UTC

[spark] branch master updated: [SPARK-31631][TESTS] Fix test flakiness caused by MiniKdc which throws 'address in use' BindException with retry

This is an automated email from the ASF dual-hosted git repository.

yamamuro pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new bd6b53c  [SPARK-31631][TESTS] Fix test flakiness caused by MiniKdc which throws 'address in use' BindException with retry
bd6b53c is described below

commit bd6b53cc0ba93f7f1ff8e00ccc366cd02a24d72a
Author: Kent Yao <ya...@hotmail.com>
AuthorDate: Thu May 7 14:37:03 2020 +0900

    [SPARK-31631][TESTS] Fix test flakiness caused by MiniKdc which throws 'address in use' BindException with retry
    
    ### What changes were proposed in this pull request?
    The `Kafka*Suite`s are flaky because of the Hadoop MiniKdc issue - https://issues.apache.org/jira/browse/HADOOP-12656
    > Looking at MiniKdc implementation, if port is 0, the constructor use ServerSocket to find an unused port, assign the port number to the member variable port and close the ServerSocket object; later, in initKDCServer(), instantiate a TcpTransport object and bind at that port.
    
    > It appears that the port may be used in between, and then throw the exception.
    
    Related test failures are suspected,  such as https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/122225/testReport/org.apache.spark.sql.kafka010/KafkaDelegationTokenSuite/_It_is_not_a_test_it_is_a_sbt_testing_SuiteSelector_/
    
    ```scala
    [info] org.apache.spark.sql.kafka010.KafkaDelegationTokenSuite *** ABORTED *** (15 seconds, 426 milliseconds)
    [info]   java.net.BindException: Address already in use
    [info]   at sun.nio.ch.Net.bind0(Native Method)
    [info]   at sun.nio.ch.Net.bind(Net.java:433)
    [info]   at sun.nio.ch.Net.bind(Net.java:425)
    [info]   at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223)
    [info]   at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
    [info]   at org.apache.mina.transport.socket.nio.NioSocketAcceptor.open(NioSocketAcceptor.java:198)
    [info]   at org.apache.mina.transport.socket.nio.NioSocketAcceptor.open(NioSocketAcceptor.java:51)
    [info]   at org.apache.mina.core.polling.AbstractPollingIoAcceptor.registerHandles(AbstractPollingIoAcceptor.java:547)
    [info]   at org.apache.mina.core.polling.AbstractPollingIoAcceptor.access$400(AbstractPollingIoAcceptor.java:68)
    [info]   at org.apache.mina.core.polling.AbstractPollingIoAcceptor$Acceptor.run(AbstractPollingIoAcceptor.java:422)
    [info]   at org.apache.mina.util.NamePreservingRunnable.run(NamePreservingRunnable.java:64)
    [info]   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    [info]   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    [info]   at java.lang.Thread.run(Thread.java:748)
    ```
    After comparing the error stack trace with similar issues reported  in different projects, such as
    https://issues.apache.org/jira/browse/KAFKA-3453
    https://issues.apache.org/jira/browse/HBASE-14734
    
    We can be sure that they are caused by the same problem issued in HADOOP-12656.
    
    In the PR, We apply the approach from HBASE first before we finally drop Hadoop 2.7.x
    
    ### Why are the changes needed?
    
    fix test flakiness
    
    ### Does this PR introduce _any_ user-facing change?
    NO
    
    ### How was this patch tested?
    
    the test itself passing Jenkins
    
    Closes #28442 from yaooqinn/SPARK-31631.
    
    Authored-by: Kent Yao <ya...@hotmail.com>
    Signed-off-by: Takeshi Yamamuro <ya...@apache.org>
---
 .../HadoopDelegationTokenManagerSuite.scala        | 30 ++++++++++++++++++++--
 .../apache/spark/sql/kafka010/KafkaTestUtils.scala | 29 ++++++++++++++++++---
 2 files changed, 54 insertions(+), 5 deletions(-)

diff --git a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
index 275bca3..fc28968 100644
--- a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
@@ -19,10 +19,14 @@ package org.apache.spark.deploy.security
 
 import java.security.PrivilegedExceptionAction
 
+import scala.util.control.NonFatal
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION
 import org.apache.hadoop.minikdc.MiniKdc
 import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.deploy.SparkHadoopUtil
@@ -88,8 +92,30 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite {
       // krb5.conf. MiniKdc sets "java.security.krb5.conf" in start and removes it when stop called.
       val kdcDir = Utils.createTempDir()
       val kdcConf = MiniKdc.createConf()
-      kdc = new MiniKdc(kdcConf, kdcDir)
-      kdc.start()
+      // The port for MiniKdc service gets selected in the constructor, but will be bound
+      // to it later in MiniKdc.start() -> MiniKdc.initKDCServer() -> KdcServer.start().
+      // In meantime, when some other service might capture the port during this progress, and
+      // cause BindException.
+      // This makes our tests which have dedicated JVMs and rely on MiniKDC being flaky
+      //
+      // https://issues.apache.org/jira/browse/HADOOP-12656 get fixed in Hadoop 2.8.0.
+      //
+      // The workaround here is to periodically repeat this process with a timeout , since we are
+      // using Hadoop 2.7.4 as default.
+      // https://issues.apache.org/jira/browse/SPARK-31631
+      eventually(timeout(10.seconds), interval(1.second)) {
+        try {
+          kdc = new MiniKdc(kdcConf, kdcDir)
+          kdc.start()
+        } catch {
+          case NonFatal(e) =>
+            if (kdc != null) {
+              kdc.stop()
+              kdc = null
+            }
+            throw e
+        }
+      }
 
       val krbConf = new Configuration()
       krbConf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos")
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
index 4f84619..e1e44b1 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
@@ -27,6 +27,7 @@ import javax.security.auth.login.Configuration
 import scala.collection.JavaConverters._
 import scala.io.Source
 import scala.util.Random
+import scala.util.control.NonFatal
 
 import com.google.common.io.Files
 import kafka.api.Request
@@ -36,7 +37,7 @@ import kafka.zk.KafkaZkClient
 import org.apache.hadoop.minikdc.MiniKdc
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.clients.admin.{AdminClient, CreatePartitionsOptions, ListConsumerGroupsResult, NewPartitions, NewTopic}
+import org.apache.kafka.clients.admin._
 import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.apache.kafka.clients.producer._
 import org.apache.kafka.common.TopicPartition
@@ -134,8 +135,30 @@ class KafkaTestUtils(
     val kdcDir = Utils.createTempDir()
     val kdcConf = MiniKdc.createConf()
     kdcConf.setProperty(MiniKdc.DEBUG, "true")
-    kdc = new MiniKdc(kdcConf, kdcDir)
-    kdc.start()
+    // The port for MiniKdc service gets selected in the constructor, but will be bound
+    // to it later in MiniKdc.start() -> MiniKdc.initKDCServer() -> KdcServer.start().
+    // In meantime, when some other service might capture the port during this progress, and
+    // cause BindException.
+    // This makes our tests which have dedicated JVMs and rely on MiniKDC being flaky
+    //
+    // https://issues.apache.org/jira/browse/HADOOP-12656 get fixed in Hadoop 2.8.0.
+    //
+    // The workaround here is to periodically repeat this process with a timeout , since we are
+    // using Hadoop 2.7.4 as default.
+    // https://issues.apache.org/jira/browse/SPARK-31631
+    eventually(timeout(10.seconds), interval(1.second)) {
+      try {
+        kdc = new MiniKdc(kdcConf, kdcDir)
+        kdc.start()
+      } catch {
+        case NonFatal(e) =>
+          if (kdc != null) {
+            kdc.stop()
+            kdc = null
+          }
+          throw e
+      }
+    }
     // TODO https://issues.apache.org/jira/browse/SPARK-30037
     // Need to build spark's own MiniKDC and customize krb5.conf like Kafka
     rewriteKrb5Conf()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org