You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Kuttaiah Robin <ku...@gmail.com> on 2018/10/14 09:02:40 UTC

Redeploying spark streaming application aborts because of checkpoint issue

Hello all,

Am using  spark-2.3.0 and hadoop-2.7.4.
I have spark streaming application which listens to kafka topic, does some
transformation and writes to Oracle database using JDBC client.

Read events from Kafka as shown below;

        m_oKafkaEvents = getSparkSession().readStream().format("kafka")
          .option("kafka.bootstrap.servers", strKafkaAddress)
          .option("assign", strSubscription)
          .option("maxOffsetsPerTrigger", "100000")
          .option("startingOffsets", "latest")
          .option("failOnDataLoss", false)
          .load()
          .filter(strFilter)

.select(functions.from_json(functions.col("value").cast("string"),
oSchema).alias("events"))
          .select("events.*");

Checkpoint is used as shown below;

DataStreamWriter<Row> oMilestoneStream = oAggregation
      .writeStream()
      .queryName(strQueryName)
      .outputMode("update")

.trigger(Trigger.ProcessingTime(getInsightDeployment().getInstanceSummary().getTriggerInterval()))
      .option("checkpointLocation", strCheckpointLocation)
      .foreach(oForeachWriter);

strCheckpointLocation is something
like /insight/tenant-0001/instance_summary/checkpoint/Opportunity_UHIgcccj.
This is hdfs location.


With this when I redeploy the spark I get below said exception. The only
work around I have currently is to delete the checkpoint location and
recreate the topic.

I also see couple of JIRA tasks which says RESOLVED but the problem still
seen.
https://issues.apache.org/jira/browse/SPARK-20894
https://issues.apache.org/jira/browse/SPARK-22262

Can someone help me on what is the best solution for this?

thanks,
Robin Kuttaiah



Exception
-------
18/10/14 03:19:16 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.lang.IllegalStateException: Error reading delta file
hdfs://hdfs-name:9000/insight/tenant-0001/instance_summary/checkpoint/Opportunity_UHIgcccj/state/1/0/1.delta
of HDFSStateStoreProvider[id = (op=1,part=0),dir =
hdfs://hdfs-name:9000/insight/tenant-0001/instance_summary/checkpoint/Opportunity_UHIgcccj/state/1/0]:
hdfs://hdfs-name:9000/insight/tenant-0001/instance_summary/checkpoint/Opportunity_UHIgcccj/state/1/0/1.delta
does not exist
        at
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org
$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:371)
        at
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$loadMap$1.apply$mcVJ$sp(HDFSBackedStateStoreProvider.scala:333)
        at
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:332)
        at
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:332)
        at
scala.collection.immutable.NumericRange.foreach(NumericRange.scala:73)
        at
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.loadMap(HDFSBackedStateStoreProvider.scala:332)
        at
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvider.scala:196)
        at
org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:369)
        at
org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:74)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
Caused by: java.io.FileNotFoundException: File does not exist:
/insight/tenant-0001/instance_summary/checkpoint/Opportunity_UHIgcccj/state/1/0/1.delta
        at
org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
        at
org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
        at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1836)
        at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1808)
        at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1723)
        at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:588)
        at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:366)
        at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
        at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2217)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2213)
        at java.security.AccessController.doPrivileged(Native Method)