You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Dongjoon Hyun (JIRA)" <ji...@apache.org> on 2018/01/26 18:06:00 UTC

[jira] [Updated] (SPARK-23239) KafkaRelationSuite should clean up its continuous queries

     [ https://issues.apache.org/jira/browse/SPARK-23239?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Dongjoon Hyun updated SPARK-23239:
----------------------------------
    Description: 
Currently, `KafkaRelationSuite` doesn't clean up its continuous queries. As a result, the next suite seems to hang at 'processAllAvailable` sometimes.

{code}
...
KafkaRelationSuite:
- explicit earliest to latest offsets
- default starting and ending offsets
- explicit offsets
- reuse same dataframe in query
- test late binding start offsets
- bad batch query options
KafkaSourceStressSuite:
(hang here)
{code}

{code}
"stream execution thread for [id = d08d575a-513a-47e9-bdf5-3d3ee0e5eca8, runId = f2236eb2-2cbb-4069-a535-9d673a96a4f7]" #3723 daemon prio=5 os_prio=0 tid=0x00007f9dfe3e0e30 nid=0xa6c87 runnable [0x00007f9d62ff9000]
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        - locked <0x000000072def5c40> (a sun.nio.ch.Util$3)
        - locked <0x000000072def5c30> (a java.util.Collections$UnmodifiableSet)
        - locked <0x000000072def1f38> (a sun.nio.ch.EPollSelectorImpl)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at org.apache.kafka.common.network.Selector.select(Selector.java:470)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:286)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
        - locked <0x000000072def1e78> (a org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:209)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:148)
        at org.apache.kafka.clients.consumer.internals.Fetcher.getOffsetsByTimes(Fetcher.java:375)
        at org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:342)
        at org.apache.kafka.clients.consumer.internals.Fetcher.resetOffsetsIfNeeded(Fetcher.java:198)
        at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1524)
        at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1242)
        at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1$$anonfun$apply$9$$anonfun$5.apply(KafkaOffsetReader.scala:197)
        at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1$$anonfun$apply$9$$anonfun$5.apply(KafkaOffsetReader.scala:197)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.mutable.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:46)
        at scala.collection.SetLike$class.map(SetLike.scala:92)
        at scala.collection.mutable.AbstractSet.map(Set.scala:46)
        at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1$$anonfun$apply$9.apply(KafkaOffsetReader.scala:197)
        at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1$$anonfun$apply$9.apply(KafkaOffsetReader.scala:189)
        at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply$mcV$sp(KafkaOffsetReader.scala:280)
        at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply(KafkaOffsetReader.scala:279)
        at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply(KafkaOffsetReader.scala:279)
        at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
        at org.apache.spark.sql.kafka010.KafkaOffsetReader.org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt(KafkaOffsetReader.scala:278)
        - locked <0x000000072df2c308> (a org.apache.spark.sql.kafka010.KafkaOffsetReader)
        at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1.apply(KafkaOffsetReader.scala:189)
        at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1.apply(KafkaOffsetReader.scala:189)
        at org.apache.spark.sql.kafka010.KafkaOffsetReader.runUninterruptibly(KafkaOffsetReader.scala:247)
        at org.apache.spark.sql.kafka010.KafkaOffsetReader.fetchLatestOffsets(KafkaOffsetReader.scala:188)
        at org.apache.spark.sql.kafka010.KafkaSource.getOffset(KafkaSource.scala:150)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$5$$anonfun$apply$5.apply(MicroBatchExecution.scala:267)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$5$$anonfun$apply$5.apply(MicroBatchExecution.scala:267)
        at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$5.apply(MicroBatchExecution.scala:266)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$5.apply(MicroBatchExecution.scala:263)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:263)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:127)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:120)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:120)
        at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:120)
        at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:116)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)

"ScalaTest-main-running-KafkaSourceStressSuite" #1 prio=5 os_prio=0 tid=0x00007f9dfc013320 nid=0xa56ca waiting on condition [0x00007f9e02341000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x000000072df2c078> (a java.util.concurrent.locks.ReentrantLock$FairSync)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
        at java.util.concurrent.locks.ReentrantLock$FairSync.lock(ReentrantLock.java:224)
        at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
        at org.apache.spark.sql.execution.streaming.StreamExecution.processAllAvailable(StreamExecution.scala:484)
        at org.apache.spark.sql.kafka010.KafkaSourceTest$AddKafkaData.addData(KafkaSourceSuite.scala:100)
        at org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:602)
        at org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:432)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.sql.streaming.StreamTest$class.liftedTree1$1(StreamTest.scala:432)
        at org.apache.spark.sql.streaming.StreamTest$class.testStream(StreamTest.scala:431)
        - locked <0x000000070023de80> (a org.apache.spark.sql.kafka010.KafkaSourceStressSuite)
        at org.apache.spark.sql.kafka010.KafkaSourceTest.testStream(KafkaSourceSuite.scala:49)
        at org.apache.spark.sql.streaming.StreamTest$class.runStressTest(StreamTest.scala:761)
        at org.apache.spark.sql.kafka010.KafkaSourceTest.runStressTest(KafkaSourceSuite.scala:49)
        at org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$anonfun$15.apply$mcV$sp(KafkaSourceSuite.scala:957)
        at org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$anonfun$15.apply(KafkaSourceSuite.scala:937)
        at org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$anonfun$15.apply(KafkaSourceSuite.scala:937)
        at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
        at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
        at org.scalatest.Transformer.apply(Transformer.scala:22)
        at org.scalatest.Transformer.apply(Transformer.scala:20)
        at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
        at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:68)
        at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:183)
        at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
        at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
        at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
        at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:196)
        at org.apache.spark.sql.kafka010.KafkaSourceTest.org$scalatest$BeforeAndAfterEach$$super$runTest(KafkaSourceSuite.scala:49)
        at org.scalatest.BeforeAndAfterEach$class.runTest(BeforeAndAfterEach.scala:221)
        at org.apache.spark.sql.kafka010.KafkaSourceTest.runTest(KafkaSourceSuite.scala:49)
        at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
        at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
        at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396)
        at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
        at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)
        at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
        at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:229)
        at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
        at org.scalatest.Suite$class.run(Suite.scala:1147)
        at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
        at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
        at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
        at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
        at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:233)
        at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:31)
        at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:213)
        at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210)
        at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:31)
        at org.scalatest.Suite$class.callExecuteOnSuite$1(Suite.scala:1210)
        at org.scalatest.Suite$$anonfun$runNestedSuites$1.apply(Suite.scala:1257)
        at org.scalatest.Suite$$anonfun$runNestedSuites$1.apply(Suite.scala:1255)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
        at org.scalatest.Suite$class.runNestedSuites(Suite.scala:1255)
        at org.scalatest.tools.DiscoverySuite.runNestedSuites(DiscoverySuite.scala:30)
        at org.scalatest.Suite$class.run(Suite.scala:1144)
        at org.scalatest.tools.DiscoverySuite.run(DiscoverySuite.scala:30)
        at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
        at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1340)
        at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1334)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1334)
        at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1011)
        at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1010)
        at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1500)
        at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1010)
        at org.scalatest.tools.Runner$.main(Runner.scala:827)
        at org.scalatest.tools.Runner.main(Runner.scala)
{code}

  was:
Currently, `KafkaRelationSuite` doesn't clean up its continuous queries. As a result, the next suite seems to hang at 'processAllAvailable` sometimes.

*TEST SEQUENCE*
{code}
...
KafkaRelationSuite:
- explicit earliest to latest offsets
- default starting and ending offsets
- explicit offsets
- reuse same dataframe in query
- test late binding start offsets
- bad batch query options
KafkaSourceStressSuite:
(hang here)
{code}

*
{code}
"stream execution thread for [id = d08d575a-513a-47e9-bdf5-3d3ee0e5eca8, runId = f2236eb2-2cbb-4069-a535-9d673a96a4f7]" #3723 daemon prio=5 os_prio=0 tid=0x00007f9dfe3e0e30 nid=0xa6c87 runnable [0x00007f9d62ff9000]
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        - locked <0x000000072def5c40> (a sun.nio.ch.Util$3)
        - locked <0x000000072def5c30> (a java.util.Collections$UnmodifiableSet)
        - locked <0x000000072def1f38> (a sun.nio.ch.EPollSelectorImpl)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at org.apache.kafka.common.network.Selector.select(Selector.java:470)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:286)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
        - locked <0x000000072def1e78> (a org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:209)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:148)
        at org.apache.kafka.clients.consumer.internals.Fetcher.getOffsetsByTimes(Fetcher.java:375)
        at org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:342)
        at org.apache.kafka.clients.consumer.internals.Fetcher.resetOffsetsIfNeeded(Fetcher.java:198)
        at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1524)
        at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1242)
        at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1$$anonfun$apply$9$$anonfun$5.apply(KafkaOffsetReader.scala:197)
        at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1$$anonfun$apply$9$$anonfun$5.apply(KafkaOffsetReader.scala:197)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.mutable.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:46)
        at scala.collection.SetLike$class.map(SetLike.scala:92)
        at scala.collection.mutable.AbstractSet.map(Set.scala:46)
        at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1$$anonfun$apply$9.apply(KafkaOffsetReader.scala:197)
        at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1$$anonfun$apply$9.apply(KafkaOffsetReader.scala:189)
        at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply$mcV$sp(KafkaOffsetReader.scala:280)
        at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply(KafkaOffsetReader.scala:279)
        at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply(KafkaOffsetReader.scala:279)
        at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
        at org.apache.spark.sql.kafka010.KafkaOffsetReader.org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt(KafkaOffsetReader.scala:278)
        - locked <0x000000072df2c308> (a org.apache.spark.sql.kafka010.KafkaOffsetReader)
        at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1.apply(KafkaOffsetReader.scala:189)
        at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1.apply(KafkaOffsetReader.scala:189)
        at org.apache.spark.sql.kafka010.KafkaOffsetReader.runUninterruptibly(KafkaOffsetReader.scala:247)
        at org.apache.spark.sql.kafka010.KafkaOffsetReader.fetchLatestOffsets(KafkaOffsetReader.scala:188)
        at org.apache.spark.sql.kafka010.KafkaSource.getOffset(KafkaSource.scala:150)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$5$$anonfun$apply$5.apply(MicroBatchExecution.scala:267)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$5$$anonfun$apply$5.apply(MicroBatchExecution.scala:267)
        at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$5.apply(MicroBatchExecution.scala:266)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$5.apply(MicroBatchExecution.scala:263)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:263)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:127)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:120)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:120)
        at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:120)
        at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:116)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)

"ScalaTest-main-running-KafkaSourceStressSuite" #1 prio=5 os_prio=0 tid=0x00007f9dfc013320 nid=0xa56ca waiting on condition [0x00007f9e02341000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x000000072df2c078> (a java.util.concurrent.locks.ReentrantLock$FairSync)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
        at java.util.concurrent.locks.ReentrantLock$FairSync.lock(ReentrantLock.java:224)
        at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
        at org.apache.spark.sql.execution.streaming.StreamExecution.processAllAvailable(StreamExecution.scala:484)
        at org.apache.spark.sql.kafka010.KafkaSourceTest$AddKafkaData.addData(KafkaSourceSuite.scala:100)
        at org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:602)
        at org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:432)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.sql.streaming.StreamTest$class.liftedTree1$1(StreamTest.scala:432)
        at org.apache.spark.sql.streaming.StreamTest$class.testStream(StreamTest.scala:431)
        - locked <0x000000070023de80> (a org.apache.spark.sql.kafka010.KafkaSourceStressSuite)
        at org.apache.spark.sql.kafka010.KafkaSourceTest.testStream(KafkaSourceSuite.scala:49)
        at org.apache.spark.sql.streaming.StreamTest$class.runStressTest(StreamTest.scala:761)
        at org.apache.spark.sql.kafka010.KafkaSourceTest.runStressTest(KafkaSourceSuite.scala:49)
        at org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$anonfun$15.apply$mcV$sp(KafkaSourceSuite.scala:957)
        at org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$anonfun$15.apply(KafkaSourceSuite.scala:937)
        at org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$anonfun$15.apply(KafkaSourceSuite.scala:937)
        at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
        at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
        at org.scalatest.Transformer.apply(Transformer.scala:22)
        at org.scalatest.Transformer.apply(Transformer.scala:20)
        at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
        at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:68)
        at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:183)
        at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
        at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
        at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
        at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:196)
        at org.apache.spark.sql.kafka010.KafkaSourceTest.org$scalatest$BeforeAndAfterEach$$super$runTest(KafkaSourceSuite.scala:49)
        at org.scalatest.BeforeAndAfterEach$class.runTest(BeforeAndAfterEach.scala:221)
        at org.apache.spark.sql.kafka010.KafkaSourceTest.runTest(KafkaSourceSuite.scala:49)
        at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
        at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
        at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396)
        at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
        at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)
        at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
        at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:229)
        at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
        at org.scalatest.Suite$class.run(Suite.scala:1147)
        at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
        at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
        at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
        at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
        at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:233)
        at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:31)
        at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:213)
        at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210)
        at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:31)
        at org.scalatest.Suite$class.callExecuteOnSuite$1(Suite.scala:1210)
        at org.scalatest.Suite$$anonfun$runNestedSuites$1.apply(Suite.scala:1257)
        at org.scalatest.Suite$$anonfun$runNestedSuites$1.apply(Suite.scala:1255)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
        at org.scalatest.Suite$class.runNestedSuites(Suite.scala:1255)
        at org.scalatest.tools.DiscoverySuite.runNestedSuites(DiscoverySuite.scala:30)
        at org.scalatest.Suite$class.run(Suite.scala:1144)
        at org.scalatest.tools.DiscoverySuite.run(DiscoverySuite.scala:30)
        at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
        at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1340)
        at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1334)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1334)
        at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1011)
        at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1010)
        at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1500)
        at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1010)
        at org.scalatest.tools.Runner$.main(Runner.scala:827)
        at org.scalatest.tools.Runner.main(Runner.scala)
{code}


> KafkaRelationSuite should clean up its continuous queries
> ---------------------------------------------------------
>
>                 Key: SPARK-23239
>                 URL: https://issues.apache.org/jira/browse/SPARK-23239
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.3.0
>            Reporter: Dongjoon Hyun
>            Priority: Major
>
> Currently, `KafkaRelationSuite` doesn't clean up its continuous queries. As a result, the next suite seems to hang at 'processAllAvailable` sometimes.
> {code}
> ...
> KafkaRelationSuite:
> - explicit earliest to latest offsets
> - default starting and ending offsets
> - explicit offsets
> - reuse same dataframe in query
> - test late binding start offsets
> - bad batch query options
> KafkaSourceStressSuite:
> (hang here)
> {code}
> {code}
> "stream execution thread for [id = d08d575a-513a-47e9-bdf5-3d3ee0e5eca8, runId = f2236eb2-2cbb-4069-a535-9d673a96a4f7]" #3723 daemon prio=5 os_prio=0 tid=0x00007f9dfe3e0e30 nid=0xa6c87 runnable [0x00007f9d62ff9000]
>    java.lang.Thread.State: RUNNABLE
>         at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
>         at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
>         at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
>         at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
>         - locked <0x000000072def5c40> (a sun.nio.ch.Util$3)
>         - locked <0x000000072def5c30> (a java.util.Collections$UnmodifiableSet)
>         - locked <0x000000072def1f38> (a sun.nio.ch.EPollSelectorImpl)
>         at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
>         at org.apache.kafka.common.network.Selector.select(Selector.java:470)
>         at org.apache.kafka.common.network.Selector.poll(Selector.java:286)
>         at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
>         - locked <0x000000072def1e78> (a org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:209)
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:148)
>         at org.apache.kafka.clients.consumer.internals.Fetcher.getOffsetsByTimes(Fetcher.java:375)
>         at org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:342)
>         at org.apache.kafka.clients.consumer.internals.Fetcher.resetOffsetsIfNeeded(Fetcher.java:198)
>         at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1524)
>         at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1242)
>         at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1$$anonfun$apply$9$$anonfun$5.apply(KafkaOffsetReader.scala:197)
>         at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1$$anonfun$apply$9$$anonfun$5.apply(KafkaOffsetReader.scala:197)
>         at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>         at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>         at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>         at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>         at scala.collection.mutable.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:46)
>         at scala.collection.SetLike$class.map(SetLike.scala:92)
>         at scala.collection.mutable.AbstractSet.map(Set.scala:46)
>         at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1$$anonfun$apply$9.apply(KafkaOffsetReader.scala:197)
>         at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1$$anonfun$apply$9.apply(KafkaOffsetReader.scala:189)
>         at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply$mcV$sp(KafkaOffsetReader.scala:280)
>         at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply(KafkaOffsetReader.scala:279)
>         at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply(KafkaOffsetReader.scala:279)
>         at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
>         at org.apache.spark.sql.kafka010.KafkaOffsetReader.org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt(KafkaOffsetReader.scala:278)
>         - locked <0x000000072df2c308> (a org.apache.spark.sql.kafka010.KafkaOffsetReader)
>         at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1.apply(KafkaOffsetReader.scala:189)
>         at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1.apply(KafkaOffsetReader.scala:189)
>         at org.apache.spark.sql.kafka010.KafkaOffsetReader.runUninterruptibly(KafkaOffsetReader.scala:247)
>         at org.apache.spark.sql.kafka010.KafkaOffsetReader.fetchLatestOffsets(KafkaOffsetReader.scala:188)
>         at org.apache.spark.sql.kafka010.KafkaSource.getOffset(KafkaSource.scala:150)
>         at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$5$$anonfun$apply$5.apply(MicroBatchExecution.scala:267)
>         at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$5$$anonfun$apply$5.apply(MicroBatchExecution.scala:267)
>         at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
>         at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
>         at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$5.apply(MicroBatchExecution.scala:266)
>         at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$5.apply(MicroBatchExecution.scala:263)
>         at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>         at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>         at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>         at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>         at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>         at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:263)
>         at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:127)
>         at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:120)
>         at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:120)
>         at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
>         at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
>         at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:120)
>         at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
>         at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:116)
>         at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
>         at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
> "ScalaTest-main-running-KafkaSourceStressSuite" #1 prio=5 os_prio=0 tid=0x00007f9dfc013320 nid=0xa56ca waiting on condition [0x00007f9e02341000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x000000072df2c078> (a java.util.concurrent.locks.ReentrantLock$FairSync)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
>         at java.util.concurrent.locks.ReentrantLock$FairSync.lock(ReentrantLock.java:224)
>         at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
>         at org.apache.spark.sql.execution.streaming.StreamExecution.processAllAvailable(StreamExecution.scala:484)
>         at org.apache.spark.sql.kafka010.KafkaSourceTest$AddKafkaData.addData(KafkaSourceSuite.scala:100)
>         at org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:602)
>         at org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:432)
>         at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>         at org.apache.spark.sql.streaming.StreamTest$class.liftedTree1$1(StreamTest.scala:432)
>         at org.apache.spark.sql.streaming.StreamTest$class.testStream(StreamTest.scala:431)
>         - locked <0x000000070023de80> (a org.apache.spark.sql.kafka010.KafkaSourceStressSuite)
>         at org.apache.spark.sql.kafka010.KafkaSourceTest.testStream(KafkaSourceSuite.scala:49)
>         at org.apache.spark.sql.streaming.StreamTest$class.runStressTest(StreamTest.scala:761)
>         at org.apache.spark.sql.kafka010.KafkaSourceTest.runStressTest(KafkaSourceSuite.scala:49)
>         at org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$anonfun$15.apply$mcV$sp(KafkaSourceSuite.scala:957)
>         at org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$anonfun$15.apply(KafkaSourceSuite.scala:937)
>         at org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$anonfun$15.apply(KafkaSourceSuite.scala:937)
>         at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
>         at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
>         at org.scalatest.Transformer.apply(Transformer.scala:22)
>         at org.scalatest.Transformer.apply(Transformer.scala:20)
>         at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
>         at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:68)
>         at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:183)
>         at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
>         at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
>         at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
>         at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:196)
>         at org.apache.spark.sql.kafka010.KafkaSourceTest.org$scalatest$BeforeAndAfterEach$$super$runTest(KafkaSourceSuite.scala:49)
>         at org.scalatest.BeforeAndAfterEach$class.runTest(BeforeAndAfterEach.scala:221)
>         at org.apache.spark.sql.kafka010.KafkaSourceTest.runTest(KafkaSourceSuite.scala:49)
>         at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
>         at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
>         at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396)
>         at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
>         at scala.collection.immutable.List.foreach(List.scala:381)
>         at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
>         at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)
>         at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
>         at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:229)
>         at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
>         at org.scalatest.Suite$class.run(Suite.scala:1147)
>         at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
>         at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
>         at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
>         at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
>         at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:233)
>         at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:31)
>         at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:213)
>         at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210)
>         at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:31)
>         at org.scalatest.Suite$class.callExecuteOnSuite$1(Suite.scala:1210)
>         at org.scalatest.Suite$$anonfun$runNestedSuites$1.apply(Suite.scala:1257)
>         at org.scalatest.Suite$$anonfun$runNestedSuites$1.apply(Suite.scala:1255)
>         at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>         at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>         at org.scalatest.Suite$class.runNestedSuites(Suite.scala:1255)
>         at org.scalatest.tools.DiscoverySuite.runNestedSuites(DiscoverySuite.scala:30)
>         at org.scalatest.Suite$class.run(Suite.scala:1144)
>         at org.scalatest.tools.DiscoverySuite.run(DiscoverySuite.scala:30)
>         at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
>         at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1340)
>         at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1334)
>         at scala.collection.immutable.List.foreach(List.scala:381)
>         at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1334)
>         at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1011)
>         at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1010)
>         at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1500)
>         at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1010)
>         at org.scalatest.tools.Runner$.main(Runner.scala:827)
>         at org.scalatest.tools.Runner.main(Runner.scala)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org