You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by kevin <ki...@gmail.com> on 2016/07/26 03:25:12 UTC

spark2.0 how to use sparksession and StreamingContext same time

hi,all:
I want to read data from kafka and regist as a table then join a jdbc table.
My sample like this :

val spark = SparkSession
      .builder
      .config(sparkConf)
      .getOrCreate()

    val jdbcDF = spark.read.format("jdbc").options(Map("url" ->
"jdbc:mysql://master1:3306/demo", "driver" -> "com.mysql.jdbc.Driver",
"dbtable" -> "i_user", "user" -> "root", "password" -> "passok")).load()
    jdbcDF.cache().createOrReplaceTempView("black_book")
      val df = spark.sql("select * from black_book")
      df.show()

    val ssc = new StreamingContext(sparkConf, Seconds(2))
    ssc.checkpoint("checkpoint")

    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    val lines = KafkaUtils.createStream(ssc, zkQuorum, group,
topicMap).map(_._2)
    val words = lines.flatMap(_.split(" "))

*I got error :*

16/07/26 11:18:07 WARN AbstractHandler: No Server set for
org.spark_project.jetty.server.handler.ErrorHandler@6f0ca692
+--------------------+--------+--------+
|                  id|username|password|
+--------------------+--------+--------+
|e6faca36-8766-4dc...|       a|       a|
|699285a3-a108-457...|   admin|     123|
|e734752d-ac98-483...|    test|    test|
|c0245226-128d-487...|   test2|   test2|
|4f1bbdb2-89d1-4cc...|     119|     911|
|16a9a360-13ee-4b5...|    1215|    1215|
|bf7d6a0d-2949-4c3...|   demo3|   demo3|
|de30747c-c466-404...|     why|     why|
|644741c9-8fd7-4a5...|   scala|       p|
|cda1e44d-af4b-461...|     123|     231|
|6e409ed9-c09b-4e7...|     798|      23|
+--------------------+--------+--------+

Exception in thread "main" org.apache.spark.SparkException: Only one
SparkContext may be running in this JVM (see SPARK-2243). To ignore this
error, set spark.driver.allowMultipleContexts = true. The currently running
SparkContext was created at:
org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:749)
main.POC$.main(POC.scala:43)
main.POC.main(POC.scala)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:724)
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
at
org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$2.apply(SparkContext.scala:2211)
at
org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$2.apply(SparkContext.scala:2207)
at scala.Option.foreach(Option.scala:257)
at
org.apache.spark.SparkContext$.assertNoOtherContextIsRunning(SparkContext.scala:2207)
at
org.apache.spark.SparkContext$.markPartiallyConstructed(SparkContext.scala:2277)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:91)
at
org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:837)
at
org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:84)
at main.POC$.main(POC.scala:50)
at main.POC.main(POC.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:724)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Re: spark2.0 how to use sparksession and StreamingContext same time

Posted by kevin <ki...@gmail.com>.
thanks a lot Terry

2016-07-26 12:03 GMT+08:00 Terry Hoo <hu...@gmail.com>:

> Kevin,
>
> Try to create the StreamingContext as following:
>
> val ssc = new StreamingContext(spark.sparkContext, Seconds(2))
>
>
>
> On Tue, Jul 26, 2016 at 11:25 AM, kevin <ki...@gmail.com> wrote:
>
>> hi,all:
>> I want to read data from kafka and regist as a table then join a jdbc
>> table.
>> My sample like this :
>>
>> val spark = SparkSession
>>       .builder
>>       .config(sparkConf)
>>       .getOrCreate()
>>
>>     val jdbcDF = spark.read.format("jdbc").options(Map("url" ->
>> "jdbc:mysql://master1:3306/demo", "driver" -> "com.mysql.jdbc.Driver",
>> "dbtable" -> "i_user", "user" -> "root", "password" -> "passok")).load()
>>     jdbcDF.cache().createOrReplaceTempView("black_book")
>>       val df = spark.sql("select * from black_book")
>>       df.show()
>>
>>     val ssc = new StreamingContext(sparkConf, Seconds(2))
>>     ssc.checkpoint("checkpoint")
>>
>>     val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
>>     val lines = KafkaUtils.createStream(ssc, zkQuorum, group,
>> topicMap).map(_._2)
>>     val words = lines.flatMap(_.split(" "))
>>
>> *I got error :*
>>
>> 16/07/26 11:18:07 WARN AbstractHandler: No Server set for
>> org.spark_project.jetty.server.handler.ErrorHandler@6f0ca692
>> +--------------------+--------+--------+
>> |                  id|username|password|
>> +--------------------+--------+--------+
>> |e6faca36-8766-4dc...|       a|       a|
>> |699285a3-a108-457...|   admin|     123|
>> |e734752d-ac98-483...|    test|    test|
>> |c0245226-128d-487...|   test2|   test2|
>> |4f1bbdb2-89d1-4cc...|     119|     911|
>> |16a9a360-13ee-4b5...|    1215|    1215|
>> |bf7d6a0d-2949-4c3...|   demo3|   demo3|
>> |de30747c-c466-404...|     why|     why|
>> |644741c9-8fd7-4a5...|   scala|       p|
>> |cda1e44d-af4b-461...|     123|     231|
>> |6e409ed9-c09b-4e7...|     798|      23|
>> +--------------------+--------+--------+
>>
>> Exception in thread "main" org.apache.spark.SparkException: Only one
>> SparkContext may be running in this JVM (see SPARK-2243). To ignore this
>> error, set spark.driver.allowMultipleContexts = true. The currently running
>> SparkContext was created at:
>>
>> org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:749)
>> main.POC$.main(POC.scala:43)
>> main.POC.main(POC.scala)
>> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> java.lang.reflect.Method.invoke(Method.java:498)
>>
>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:724)
>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
>> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
>> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
>> org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>> at
>> org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$2.apply(SparkContext.scala:2211)
>> at
>> org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$2.apply(SparkContext.scala:2207)
>> at scala.Option.foreach(Option.scala:257)
>> at
>> org.apache.spark.SparkContext$.assertNoOtherContextIsRunning(SparkContext.scala:2207)
>> at
>> org.apache.spark.SparkContext$.markPartiallyConstructed(SparkContext.scala:2277)
>> at org.apache.spark.SparkContext.<init>(SparkContext.scala:91)
>> at
>> org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:837)
>> at
>> org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:84)
>> at main.POC$.main(POC.scala:50)
>> at main.POC.main(POC.scala)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:724)
>> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>
>>
>>
>

Re: spark2.0 how to use sparksession and StreamingContext same time

Posted by kevin <ki...@gmail.com>.
thanks a lot Terry

2016-07-26 12:03 GMT+08:00 Terry Hoo <hu...@gmail.com>:

> Kevin,
>
> Try to create the StreamingContext as following:
>
> val ssc = new StreamingContext(spark.sparkContext, Seconds(2))
>
>
>
> On Tue, Jul 26, 2016 at 11:25 AM, kevin <ki...@gmail.com> wrote:
>
>> hi,all:
>> I want to read data from kafka and regist as a table then join a jdbc
>> table.
>> My sample like this :
>>
>> val spark = SparkSession
>>       .builder
>>       .config(sparkConf)
>>       .getOrCreate()
>>
>>     val jdbcDF = spark.read.format("jdbc").options(Map("url" ->
>> "jdbc:mysql://master1:3306/demo", "driver" -> "com.mysql.jdbc.Driver",
>> "dbtable" -> "i_user", "user" -> "root", "password" -> "passok")).load()
>>     jdbcDF.cache().createOrReplaceTempView("black_book")
>>       val df = spark.sql("select * from black_book")
>>       df.show()
>>
>>     val ssc = new StreamingContext(sparkConf, Seconds(2))
>>     ssc.checkpoint("checkpoint")
>>
>>     val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
>>     val lines = KafkaUtils.createStream(ssc, zkQuorum, group,
>> topicMap).map(_._2)
>>     val words = lines.flatMap(_.split(" "))
>>
>> *I got error :*
>>
>> 16/07/26 11:18:07 WARN AbstractHandler: No Server set for
>> org.spark_project.jetty.server.handler.ErrorHandler@6f0ca692
>> +--------------------+--------+--------+
>> |                  id|username|password|
>> +--------------------+--------+--------+
>> |e6faca36-8766-4dc...|       a|       a|
>> |699285a3-a108-457...|   admin|     123|
>> |e734752d-ac98-483...|    test|    test|
>> |c0245226-128d-487...|   test2|   test2|
>> |4f1bbdb2-89d1-4cc...|     119|     911|
>> |16a9a360-13ee-4b5...|    1215|    1215|
>> |bf7d6a0d-2949-4c3...|   demo3|   demo3|
>> |de30747c-c466-404...|     why|     why|
>> |644741c9-8fd7-4a5...|   scala|       p|
>> |cda1e44d-af4b-461...|     123|     231|
>> |6e409ed9-c09b-4e7...|     798|      23|
>> +--------------------+--------+--------+
>>
>> Exception in thread "main" org.apache.spark.SparkException: Only one
>> SparkContext may be running in this JVM (see SPARK-2243). To ignore this
>> error, set spark.driver.allowMultipleContexts = true. The currently running
>> SparkContext was created at:
>>
>> org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:749)
>> main.POC$.main(POC.scala:43)
>> main.POC.main(POC.scala)
>> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> java.lang.reflect.Method.invoke(Method.java:498)
>>
>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:724)
>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
>> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
>> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
>> org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>> at
>> org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$2.apply(SparkContext.scala:2211)
>> at
>> org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$2.apply(SparkContext.scala:2207)
>> at scala.Option.foreach(Option.scala:257)
>> at
>> org.apache.spark.SparkContext$.assertNoOtherContextIsRunning(SparkContext.scala:2207)
>> at
>> org.apache.spark.SparkContext$.markPartiallyConstructed(SparkContext.scala:2277)
>> at org.apache.spark.SparkContext.<init>(SparkContext.scala:91)
>> at
>> org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:837)
>> at
>> org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:84)
>> at main.POC$.main(POC.scala:50)
>> at main.POC.main(POC.scala)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:724)
>> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>
>>
>>
>

Re: spark2.0 how to use sparksession and StreamingContext same time

Posted by Terry Hoo <hu...@gmail.com>.
Kevin,

Try to create the StreamingContext as following:

val ssc = new StreamingContext(spark.sparkContext, Seconds(2))



On Tue, Jul 26, 2016 at 11:25 AM, kevin <ki...@gmail.com> wrote:

> hi,all:
> I want to read data from kafka and regist as a table then join a jdbc
> table.
> My sample like this :
>
> val spark = SparkSession
>       .builder
>       .config(sparkConf)
>       .getOrCreate()
>
>     val jdbcDF = spark.read.format("jdbc").options(Map("url" ->
> "jdbc:mysql://master1:3306/demo", "driver" -> "com.mysql.jdbc.Driver",
> "dbtable" -> "i_user", "user" -> "root", "password" -> "passok")).load()
>     jdbcDF.cache().createOrReplaceTempView("black_book")
>       val df = spark.sql("select * from black_book")
>       df.show()
>
>     val ssc = new StreamingContext(sparkConf, Seconds(2))
>     ssc.checkpoint("checkpoint")
>
>     val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
>     val lines = KafkaUtils.createStream(ssc, zkQuorum, group,
> topicMap).map(_._2)
>     val words = lines.flatMap(_.split(" "))
>
> *I got error :*
>
> 16/07/26 11:18:07 WARN AbstractHandler: No Server set for
> org.spark_project.jetty.server.handler.ErrorHandler@6f0ca692
> +--------------------+--------+--------+
> |                  id|username|password|
> +--------------------+--------+--------+
> |e6faca36-8766-4dc...|       a|       a|
> |699285a3-a108-457...|   admin|     123|
> |e734752d-ac98-483...|    test|    test|
> |c0245226-128d-487...|   test2|   test2|
> |4f1bbdb2-89d1-4cc...|     119|     911|
> |16a9a360-13ee-4b5...|    1215|    1215|
> |bf7d6a0d-2949-4c3...|   demo3|   demo3|
> |de30747c-c466-404...|     why|     why|
> |644741c9-8fd7-4a5...|   scala|       p|
> |cda1e44d-af4b-461...|     123|     231|
> |6e409ed9-c09b-4e7...|     798|      23|
> +--------------------+--------+--------+
>
> Exception in thread "main" org.apache.spark.SparkException: Only one
> SparkContext may be running in this JVM (see SPARK-2243). To ignore this
> error, set spark.driver.allowMultipleContexts = true. The currently running
> SparkContext was created at:
>
> org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:749)
> main.POC$.main(POC.scala:43)
> main.POC.main(POC.scala)
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> java.lang.reflect.Method.invoke(Method.java:498)
>
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:724)
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
> org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> at
> org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$2.apply(SparkContext.scala:2211)
> at
> org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$2.apply(SparkContext.scala:2207)
> at scala.Option.foreach(Option.scala:257)
> at
> org.apache.spark.SparkContext$.assertNoOtherContextIsRunning(SparkContext.scala:2207)
> at
> org.apache.spark.SparkContext$.markPartiallyConstructed(SparkContext.scala:2277)
> at org.apache.spark.SparkContext.<init>(SparkContext.scala:91)
> at
> org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:837)
> at
> org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:84)
> at main.POC$.main(POC.scala:50)
> at main.POC.main(POC.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:724)
> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
>
>