You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Guillermo Ortiz <ko...@gmail.com> on 2018/04/03 22:03:13 UTC
Testing spark-testing-base. Error multiple SparkContext
I'm doing a spark test with spark streaming, cassandra and kafka.
I have an action which has an DStream as input and save to Cassandra and
sometimes put some elements in Kafka.
I'm using https://github.com/holdenk/spark-testing-base and kafka y
cassandra in local.
My method looks like:
*def execute(dstream: DStream[MyObject]) : Unit = { //Some proccesing
--> this works //Save to Cassandra some RDDs --> this works //Send to
Kafka some record. --> this doesn't work in test, it works outside of the
test.}*
When I send data to Kafka:
*//There is an error in this method*
*def sendToKafka(rec: DStream[CrmTipoCliente]) = { rec.foreachRDD( r =>
{ r.foreachPartition { val kafka =
SparkKafkaSink[String,String](Config.kafkapropsProd) --> Exception here.
Config.kafkapr.... returns a properties with the values to connect to
Kafka partition => partition.foreach { message =>
{ //Some logic.. kafka.send("test", null,
"message") } } } })*
My test looks like:
*@RunWith(classOf[JUnitRunner])class CassandraConnectionIntegrationTest
extends FunSuite with BeforeAndAfter with BeforeAndAfterAll with
StreamingActionBase{ var cluster: Cluster = _ implicit var session:
Session = _ val keyspace: String = "iris" val table: String =
keyspace + ".tipo_cliente_ref" var service: MyClass = _override def
beforeAll(): Unit = { super.beforeAll() //This line doesn't work!
sc.getConf.set("spark.driver.allowMultipleContexts", "true")
...test("Insert record ") { val inputInsert = MyObject("...") val
input = List(List(inputInsert)) runAction[MyObject](input,
service.execute) val result = session.execute("select * from myTable
WHERE...") //Some assert to Cassandra and Kafka}*
This test partial works, it saves data into Cassandra but it doesn't work
when it has to send data to Kafka.
The error I can see:
23:58:45.329 [pool-22-thread-1] INFO o.a.spark.streaming.CheckpointWriter
- Saving checkpoint for time 1000 ms to file
'file:/C:/Users/A148681/AppData/Local/Temp/spark-cdf3229b-9d84
-400f-b92a-5ff4086b81c3/checkpoint-1000'
Exception in thread "streaming-job-executor-0"
java.lang.ExceptionInInitializerError
at
com.example.streaming.CrmTipoClienteRunner$$anonfun$monitKafkaToCassandra$1.apply(MyClass.scala:49)
at
com.example.streaming.CrmTipoClienteRunner$$anonfun$monitKafkaToCassandra$1.apply(MyClass
.scala:47)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:247)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:247)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:247)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:246)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
*Caused by: org.apache.spark.SparkException: Only one SparkContext may be
running in this JVM (see SPARK-2243). To ignore this error, set
spark.driver.allowMultipleContexts = true. Th*
e currently running SparkContext was created at:
org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2258)
com.holdenkarau.spark.testing.SharedSparkContext$class.beforeAll(SharedSparkContext.scala:45)