You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ya...@apache.org on 2020/05/07 05:38:26 UTC
[spark] branch master updated: [SPARK-31631][TESTS] Fix test
flakiness caused by MiniKdc which throws 'address in use' BindException
with retry
This is an automated email from the ASF dual-hosted git repository.
yamamuro pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new bd6b53c [SPARK-31631][TESTS] Fix test flakiness caused by MiniKdc which throws 'address in use' BindException with retry
bd6b53c is described below
commit bd6b53cc0ba93f7f1ff8e00ccc366cd02a24d72a
Author: Kent Yao <ya...@hotmail.com>
AuthorDate: Thu May 7 14:37:03 2020 +0900
[SPARK-31631][TESTS] Fix test flakiness caused by MiniKdc which throws 'address in use' BindException with retry
### What changes were proposed in this pull request?
The `Kafka*Suite`s are flaky because of the Hadoop MiniKdc issue - https://issues.apache.org/jira/browse/HADOOP-12656
> Looking at MiniKdc implementation, if port is 0, the constructor use ServerSocket to find an unused port, assign the port number to the member variable port and close the ServerSocket object; later, in initKDCServer(), instantiate a TcpTransport object and bind at that port.
> It appears that the port may be used in between, and then throw the exception.
Related test failures are suspected, such as https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/122225/testReport/org.apache.spark.sql.kafka010/KafkaDelegationTokenSuite/_It_is_not_a_test_it_is_a_sbt_testing_SuiteSelector_/
```scala
[info] org.apache.spark.sql.kafka010.KafkaDelegationTokenSuite *** ABORTED *** (15 seconds, 426 milliseconds)
[info] java.net.BindException: Address already in use
[info] at sun.nio.ch.Net.bind0(Native Method)
[info] at sun.nio.ch.Net.bind(Net.java:433)
[info] at sun.nio.ch.Net.bind(Net.java:425)
[info] at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223)
[info] at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
[info] at org.apache.mina.transport.socket.nio.NioSocketAcceptor.open(NioSocketAcceptor.java:198)
[info] at org.apache.mina.transport.socket.nio.NioSocketAcceptor.open(NioSocketAcceptor.java:51)
[info] at org.apache.mina.core.polling.AbstractPollingIoAcceptor.registerHandles(AbstractPollingIoAcceptor.java:547)
[info] at org.apache.mina.core.polling.AbstractPollingIoAcceptor.access$400(AbstractPollingIoAcceptor.java:68)
[info] at org.apache.mina.core.polling.AbstractPollingIoAcceptor$Acceptor.run(AbstractPollingIoAcceptor.java:422)
[info] at org.apache.mina.util.NamePreservingRunnable.run(NamePreservingRunnable.java:64)
[info] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[info] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[info] at java.lang.Thread.run(Thread.java:748)
```
After comparing the error stack trace with similar issues reported in different projects, such as
https://issues.apache.org/jira/browse/KAFKA-3453
https://issues.apache.org/jira/browse/HBASE-14734
We can be sure that they are caused by the same problem issued in HADOOP-12656.
In the PR, We apply the approach from HBASE first before we finally drop Hadoop 2.7.x
### Why are the changes needed?
fix test flakiness
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
the test itself passing Jenkins
Closes #28442 from yaooqinn/SPARK-31631.
Authored-by: Kent Yao <ya...@hotmail.com>
Signed-off-by: Takeshi Yamamuro <ya...@apache.org>
---
.../HadoopDelegationTokenManagerSuite.scala | 30 ++++++++++++++++++++--
.../apache/spark/sql/kafka010/KafkaTestUtils.scala | 29 ++++++++++++++++++---
2 files changed, 54 insertions(+), 5 deletions(-)
diff --git a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
index 275bca3..fc28968 100644
--- a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
@@ -19,10 +19,14 @@ package org.apache.spark.deploy.security
import java.security.PrivilegedExceptionAction
+import scala.util.control.NonFatal
+
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION
import org.apache.hadoop.minikdc.MiniKdc
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.SpanSugar._
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.SparkHadoopUtil
@@ -88,8 +92,30 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite {
// krb5.conf. MiniKdc sets "java.security.krb5.conf" in start and removes it when stop called.
val kdcDir = Utils.createTempDir()
val kdcConf = MiniKdc.createConf()
- kdc = new MiniKdc(kdcConf, kdcDir)
- kdc.start()
+ // The port for MiniKdc service gets selected in the constructor, but will be bound
+ // to it later in MiniKdc.start() -> MiniKdc.initKDCServer() -> KdcServer.start().
+ // In meantime, when some other service might capture the port during this progress, and
+ // cause BindException.
+ // This makes our tests which have dedicated JVMs and rely on MiniKDC being flaky
+ //
+ // https://issues.apache.org/jira/browse/HADOOP-12656 get fixed in Hadoop 2.8.0.
+ //
+ // The workaround here is to periodically repeat this process with a timeout , since we are
+ // using Hadoop 2.7.4 as default.
+ // https://issues.apache.org/jira/browse/SPARK-31631
+ eventually(timeout(10.seconds), interval(1.second)) {
+ try {
+ kdc = new MiniKdc(kdcConf, kdcDir)
+ kdc.start()
+ } catch {
+ case NonFatal(e) =>
+ if (kdc != null) {
+ kdc.stop()
+ kdc = null
+ }
+ throw e
+ }
+ }
val krbConf = new Configuration()
krbConf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos")
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
index 4f84619..e1e44b1 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
@@ -27,6 +27,7 @@ import javax.security.auth.login.Configuration
import scala.collection.JavaConverters._
import scala.io.Source
import scala.util.Random
+import scala.util.control.NonFatal
import com.google.common.io.Files
import kafka.api.Request
@@ -36,7 +37,7 @@ import kafka.zk.KafkaZkClient
import org.apache.hadoop.minikdc.MiniKdc
import org.apache.hadoop.security.UserGroupInformation
import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.clients.admin.{AdminClient, CreatePartitionsOptions, ListConsumerGroupsResult, NewPartitions, NewTopic}
+import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer._
import org.apache.kafka.common.TopicPartition
@@ -134,8 +135,30 @@ class KafkaTestUtils(
val kdcDir = Utils.createTempDir()
val kdcConf = MiniKdc.createConf()
kdcConf.setProperty(MiniKdc.DEBUG, "true")
- kdc = new MiniKdc(kdcConf, kdcDir)
- kdc.start()
+ // The port for MiniKdc service gets selected in the constructor, but will be bound
+ // to it later in MiniKdc.start() -> MiniKdc.initKDCServer() -> KdcServer.start().
+ // In meantime, when some other service might capture the port during this progress, and
+ // cause BindException.
+ // This makes our tests which have dedicated JVMs and rely on MiniKDC being flaky
+ //
+ // https://issues.apache.org/jira/browse/HADOOP-12656 get fixed in Hadoop 2.8.0.
+ //
+ // The workaround here is to periodically repeat this process with a timeout , since we are
+ // using Hadoop 2.7.4 as default.
+ // https://issues.apache.org/jira/browse/SPARK-31631
+ eventually(timeout(10.seconds), interval(1.second)) {
+ try {
+ kdc = new MiniKdc(kdcConf, kdcDir)
+ kdc.start()
+ } catch {
+ case NonFatal(e) =>
+ if (kdc != null) {
+ kdc.stop()
+ kdc = null
+ }
+ throw e
+ }
+ }
// TODO https://issues.apache.org/jira/browse/SPARK-30037
// Need to build spark's own MiniKDC and customize krb5.conf like Kafka
rewriteKrb5Conf()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org