You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@zeppelin.apache.org by "Chandana Kithalagama (JIRA)" <ji...@apache.org> on 2019/03/18 15:59:00 UTC

[jira] [Created] (ZEPPELIN-4079) SparkShims.getNoteId returns NPE

Chandana Kithalagama created ZEPPELIN-4079:
----------------------------------------------

             Summary: SparkShims.getNoteId returns NPE
                 Key: ZEPPELIN-4079
                 URL: https://issues.apache.org/jira/browse/ZEPPELIN-4079
             Project: Zeppelin
          Issue Type: Bug
          Components: zeppelin-interpreter
    Affects Versions: 0.8.1
         Environment: zeppelin-0.8.1-bin-netinst running on local machine,

spark interpreter,

2 external dependencies added to the spark interpreter configs:
 - org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.0  

 - com.typesafe.play:play-json_2.11:2.6.8

Connecting to Spark 2.4.0 running on local machine

$ java -version

java version "1.8.0_191"
Java(TM) SE Runtime Environment (build 1.8.0_191-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.191-b12, mixed mode)
            Reporter: Chandana Kithalagama


When I run the following Scala program in Zeppelin notebook, A NPE is shown in the logs/zeppelin-interpreter-spark-<name>.log file.
{code:java}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferBrokers
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}

import org.apache.kafka.common.serialization.StringDeserializer

import play.api.libs.json._

val PREFIX = "CK-LOG ====------> "

case class SenseData(hash: String, value: Float, updated: String)

/** Lazily instantiated singleton instance of SparkSession */
object SparkSessionSingleton {

  @transient  private var instance: SparkSession = _

  def getInstance(sparkConf: SparkConf): SparkSession = {
    if (instance == null) {
      instance = SparkSession
        .builder
        .config(sparkConf)
        .getOrCreate()
    }
    instance
  }
}

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "sensor_data-2019",
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics = Array("plugin_topic_536cc303-eb2f-4ff9-b546-d8c59b6c5466")

val streamingContext = new StreamingContext(sc, Seconds(60))
println(PREFIX + "streamContext created")

val stream = KafkaUtils.createDirectStream(
  streamingContext,
  PreferBrokers,
  Subscribe[String, String](topics, kafkaParams)
)
println(PREFIX + "DStream created")

// val msgs = stream.window(Seconds(10))
stream.map(
  record => {
    var json: JsValue = Json.parse(record.value)
    SenseData(json("b")("notification")("deviceId").as[String], json("b")("notification")("parameters")("temperature").as[Float], json("b")("notification")("timestamp").as[String])
  }
).foreachRDD( (rdd: RDD[SenseData], time: Time) => {
  println("--- New RDD with " + rdd.partitions.size + " partitions and " + rdd.count + " records")

  // Get the singleton instance of SparkSession
  val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
  import spark.implicits._

  // this is how to print the rdd in the
  // https://spark.apache.org/docs/latest/rdd-programming-guide.html#printing-elements-of-an-rdd
  // rdd.take(100).foreach(println)
  rdd.collect().foreach(println)

  // rdd.toDF() won't work without spark session and imported implicits
  // https://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations
  rdd.toDF().createOrReplaceTempView("sensedata")

  val senseDataDF =
    spark.sql("select value, updated from sensedata")
  println(s"========= $time =========")
  senseDataDF.show()
}
)

streamingContext.start(){code}
Error:
{code:java}
ERROR [2019-03-18 17:02:00,021] ({spark-listener-group-shared} Logging.scala[logError]:91) - Listener threw an exception
java.lang.NullPointerException
at org.apache.zeppelin.spark.SparkShims.getNoteId(SparkShims.java:96)
at org.apache.zeppelin.spark.SparkShims.buildSparkJobUrl(SparkShims.java:117)
at org.apache.zeppelin.spark.Spark2Shims$1.onJobStart(Spark2Shims.java:44)
at org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:37)
at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:91)
at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$super$postToAll(AsyncEventQueue.scala:92)
at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply$mcJ$sp(AsyncEventQueue.scala:92)
at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:87)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:83)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1302)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:82){code}
It looks like this issue is related to [this|https://jira.apache.org/jira/browse/ZEPPELIN-3242] and fixed in 0.8.0.

The issue is in [line 96|https://github.com/apache/zeppelin/blob/c46f55f1d81df944fd1b69a7ccb68d0647294543/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java#L96] of SparkShims.java file and caused by _jobgroupid_ being null which is caused by [this|https://github.com/apache/zeppelin/blob/c46f55f1d81df944fd1b69a7ccb68d0647294543/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java#L109] - a missing **_spark.jobGroup.id_. It is set/passed during job [start|https://github.com/apache/zeppelin/blob/c46f55f1d81df944fd1b69a7ccb68d0647294543/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java#L41] [.|https://github.com/apache/zeppelin/blob/c46f55f1d81df944fd1b69a7ccb68d0647294543/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java#L41].] I am not sure why it is not passed or taken from the interpreter properties.

Adding the 'spark.jobGroup.id' property to an arbitrary string in the Zeppelin interpreters -> Spark -> Properties didn't sort it out either.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)