You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Kuttaiah Robin <ku...@gmail.com> on 2018/10/14 09:02:40 UTC
Redeploying spark streaming application aborts because of checkpoint issue
Hello all,
Am using spark-2.3.0 and hadoop-2.7.4.
I have spark streaming application which listens to kafka topic, does some
transformation and writes to Oracle database using JDBC client.
Read events from Kafka as shown below;
m_oKafkaEvents = getSparkSession().readStream().format("kafka")
.option("kafka.bootstrap.servers", strKafkaAddress)
.option("assign", strSubscription)
.option("maxOffsetsPerTrigger", "100000")
.option("startingOffsets", "latest")
.option("failOnDataLoss", false)
.load()
.filter(strFilter)
.select(functions.from_json(functions.col("value").cast("string"),
oSchema).alias("events"))
.select("events.*");
Checkpoint is used as shown below;
DataStreamWriter<Row> oMilestoneStream = oAggregation
.writeStream()
.queryName(strQueryName)
.outputMode("update")
.trigger(Trigger.ProcessingTime(getInsightDeployment().getInstanceSummary().getTriggerInterval()))
.option("checkpointLocation", strCheckpointLocation)
.foreach(oForeachWriter);
strCheckpointLocation is something
like /insight/tenant-0001/instance_summary/checkpoint/Opportunity_UHIgcccj.
This is hdfs location.
With this when I redeploy the spark I get below said exception. The only
work around I have currently is to delete the checkpoint location and
recreate the topic.
I also see couple of JIRA tasks which says RESOLVED but the problem still
seen.
https://issues.apache.org/jira/browse/SPARK-20894
https://issues.apache.org/jira/browse/SPARK-22262
Can someone help me on what is the best solution for this?
thanks,
Robin Kuttaiah
Exception
-------
18/10/14 03:19:16 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.lang.IllegalStateException: Error reading delta file
hdfs://hdfs-name:9000/insight/tenant-0001/instance_summary/checkpoint/Opportunity_UHIgcccj/state/1/0/1.delta
of HDFSStateStoreProvider[id = (op=1,part=0),dir =
hdfs://hdfs-name:9000/insight/tenant-0001/instance_summary/checkpoint/Opportunity_UHIgcccj/state/1/0]:
hdfs://hdfs-name:9000/insight/tenant-0001/instance_summary/checkpoint/Opportunity_UHIgcccj/state/1/0/1.delta
does not exist
at
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org
$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:371)
at
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$loadMap$1.apply$mcVJ$sp(HDFSBackedStateStoreProvider.scala:333)
at
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:332)
at
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:332)
at
scala.collection.immutable.NumericRange.foreach(NumericRange.scala:73)
at
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.loadMap(HDFSBackedStateStoreProvider.scala:332)
at
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvider.scala:196)
at
org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:369)
at
org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:74)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
Caused by: java.io.FileNotFoundException: File does not exist:
/insight/tenant-0001/instance_summary/checkpoint/Opportunity_UHIgcccj/state/1/0/1.delta
at
org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
at
org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1836)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1808)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1723)
at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:588)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:366)
at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2217)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2213)
at java.security.AccessController.doPrivileged(Native Method)