You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@zeppelin.apache.org by "Chandana Kithalagama (JIRA)" <ji...@apache.org> on 2019/03/18 15:59:00 UTC
[jira] [Created] (ZEPPELIN-4079) SparkShims.getNoteId returns NPE
Chandana Kithalagama created ZEPPELIN-4079:
----------------------------------------------
Summary: SparkShims.getNoteId returns NPE
Key: ZEPPELIN-4079
URL: https://issues.apache.org/jira/browse/ZEPPELIN-4079
Project: Zeppelin
Issue Type: Bug
Components: zeppelin-interpreter
Affects Versions: 0.8.1
Environment: zeppelin-0.8.1-bin-netinst running on local machine,
spark interpreter,
2 external dependencies added to the spark interpreter configs:
- org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.0
- com.typesafe.play:play-json_2.11:2.6.8
Connecting to Spark 2.4.0 running on local machine
$ java -version
java version "1.8.0_191"
Java(TM) SE Runtime Environment (build 1.8.0_191-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.191-b12, mixed mode)
Reporter: Chandana Kithalagama
When I run the following Scala program in Zeppelin notebook, A NPE is shown in the logs/zeppelin-interpreter-spark-<name>.log file.
{code:java}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferBrokers
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
import org.apache.kafka.common.serialization.StringDeserializer
import play.api.libs.json._
val PREFIX = "CK-LOG ====------> "
case class SenseData(hash: String, value: Float, updated: String)
/** Lazily instantiated singleton instance of SparkSession */
object SparkSessionSingleton {
@transient private var instance: SparkSession = _
def getInstance(sparkConf: SparkConf): SparkSession = {
if (instance == null) {
instance = SparkSession
.builder
.config(sparkConf)
.getOrCreate()
}
instance
}
}
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "sensor_data-2019",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("plugin_topic_536cc303-eb2f-4ff9-b546-d8c59b6c5466")
val streamingContext = new StreamingContext(sc, Seconds(60))
println(PREFIX + "streamContext created")
val stream = KafkaUtils.createDirectStream(
streamingContext,
PreferBrokers,
Subscribe[String, String](topics, kafkaParams)
)
println(PREFIX + "DStream created")
// val msgs = stream.window(Seconds(10))
stream.map(
record => {
var json: JsValue = Json.parse(record.value)
SenseData(json("b")("notification")("deviceId").as[String], json("b")("notification")("parameters")("temperature").as[Float], json("b")("notification")("timestamp").as[String])
}
).foreachRDD( (rdd: RDD[SenseData], time: Time) => {
println("--- New RDD with " + rdd.partitions.size + " partitions and " + rdd.count + " records")
// Get the singleton instance of SparkSession
val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
import spark.implicits._
// this is how to print the rdd in the
// https://spark.apache.org/docs/latest/rdd-programming-guide.html#printing-elements-of-an-rdd
// rdd.take(100).foreach(println)
rdd.collect().foreach(println)
// rdd.toDF() won't work without spark session and imported implicits
// https://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations
rdd.toDF().createOrReplaceTempView("sensedata")
val senseDataDF =
spark.sql("select value, updated from sensedata")
println(s"========= $time =========")
senseDataDF.show()
}
)
streamingContext.start(){code}
Error:
{code:java}
ERROR [2019-03-18 17:02:00,021] ({spark-listener-group-shared} Logging.scala[logError]:91) - Listener threw an exception
java.lang.NullPointerException
at org.apache.zeppelin.spark.SparkShims.getNoteId(SparkShims.java:96)
at org.apache.zeppelin.spark.SparkShims.buildSparkJobUrl(SparkShims.java:117)
at org.apache.zeppelin.spark.Spark2Shims$1.onJobStart(Spark2Shims.java:44)
at org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:37)
at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:91)
at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$super$postToAll(AsyncEventQueue.scala:92)
at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply$mcJ$sp(AsyncEventQueue.scala:92)
at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:87)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:83)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1302)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:82){code}
It looks like this issue is related to [this|https://jira.apache.org/jira/browse/ZEPPELIN-3242] and fixed in 0.8.0.
The issue is in [line 96|https://github.com/apache/zeppelin/blob/c46f55f1d81df944fd1b69a7ccb68d0647294543/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java#L96] of SparkShims.java file and caused by _jobgroupid_ being null which is caused by [this|https://github.com/apache/zeppelin/blob/c46f55f1d81df944fd1b69a7ccb68d0647294543/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java#L109] - a missing **_spark.jobGroup.id_. It is set/passed during job [start|https://github.com/apache/zeppelin/blob/c46f55f1d81df944fd1b69a7ccb68d0647294543/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java#L41] [.|https://github.com/apache/zeppelin/blob/c46f55f1d81df944fd1b69a7ccb68d0647294543/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java#L41].] I am not sure why it is not passed or taken from the interpreter properties.
Adding the 'spark.jobGroup.id' property to an arbitrary string in the Zeppelin interpreters -> Spark -> Properties didn't sort it out either.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)