You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by kant kodali <ka...@gmail.com> on 2017/05/24 22:35:22 UTC

Running into the same problem as JIRA SPARK-19268

Hi All,

I am using Spark 2.1.1 and running in a Standalone mode using HDFS and
Kafka

I am running into the same problem as
https://issues.apache.org/jira/browse/SPARK-19268 with my app(not
KafkaWordCount).

Here is my sample code

*Here is how I create ReadStream*

sparkSession.readStream()
                .format("kafka")
                .option("kafka.bootstrap.servers",
config.getString("kafka.consumer.settings.bootstrapServers"))
                .option("subscribe",
config.getString("kafka.consumer.settings.topicName"))
                .option("startingOffsets", "earliest")
                .option("failOnDataLoss", "false")
                .option("checkpointLocation", hdfsCheckPointDir)
                .load();


*The core logic*

Dataset<Row> df = ds.select(from_json(new
Column("value").cast("string"), client.getSchema()).as("payload"));
Dataset<Row> df1 = df.selectExpr("payload.info.*", "payload.data.*");
Dataset<Row> df2 = df1.groupBy(window(df1.col("Timestamp5"), "24
hours", "24 hours"), df1.col("AppName")).agg(sum("Amount"));
StreamingQuery query = df1.writeStream().foreach(new
KafkaSink()).outputMode("update").start();
query.awaitTermination();


I can also provide any other information you may need.

Thanks!

Re: Running into the same problem as JIRA SPARK-19268

Posted by kant kodali <ka...@gmail.com>.
https://issues.apache.org/jira/browse/SPARK-20894

On Thu, May 25, 2017 at 4:31 PM, Shixiong(Ryan) Zhu <shixiong@databricks.com
> wrote:

> I don't know what happened in your case so cannot provide any work around.
> It would be great if you can provide logs output
> by HDFSBackedStateStoreProvider.
>
> On Thu, May 25, 2017 at 4:05 PM, kant kodali <ka...@gmail.com> wrote:
>
>>
>> On Thu, May 25, 2017 at 3:41 PM, Shixiong(Ryan) Zhu <
>> shixiong@databricks.com> wrote:
>>
>>> bin/hadoop fs -ls /usr/local/hadoop/checkpoint/state/0/*
>>>
>>
>> Hi,
>>
>> There are no files under bin/hadoop fs -ls /usr/local/hadoop/checkpoint/state/0/*
>> but all the directories until /usr/local/hadoop/checkpoint/state/0 does
>> exist(which are created by spark).
>>
>> yes I can attach the log but pretty much it looks like same as I sent on
>> this thread.
>>
>> Is there any work around to this for now? Will create a ticket shortly.
>>
>> Thanks!
>>
>
>

Re: Running into the same problem as JIRA SPARK-19268

Posted by "Shixiong(Ryan) Zhu" <sh...@databricks.com>.
I don't know what happened in your case so cannot provide any work around.
It would be great if you can provide logs output by
HDFSBackedStateStoreProvider.

On Thu, May 25, 2017 at 4:05 PM, kant kodali <ka...@gmail.com> wrote:

>
> On Thu, May 25, 2017 at 3:41 PM, Shixiong(Ryan) Zhu <
> shixiong@databricks.com> wrote:
>
>> bin/hadoop fs -ls /usr/local/hadoop/checkpoint/state/0/*
>>
>
> Hi,
>
> There are no files under bin/hadoop fs -ls /usr/local/hadoop/checkpoint/state/0/*
> but all the directories until /usr/local/hadoop/checkpoint/state/0 does
> exist(which are created by spark).
>
> yes I can attach the log but pretty much it looks like same as I sent on
> this thread.
>
> Is there any work around to this for now? Will create a ticket shortly.
>
> Thanks!
>

Re: Running into the same problem as JIRA SPARK-19268

Posted by kant kodali <ka...@gmail.com>.
On Thu, May 25, 2017 at 3:41 PM, Shixiong(Ryan) Zhu <shixiong@databricks.com
> wrote:

> bin/hadoop fs -ls /usr/local/hadoop/checkpoint/state/0/*
>

Hi,

There are no files under bin/hadoop fs -ls
/usr/local/hadoop/checkpoint/state/0/*
but all the directories until /usr/local/hadoop/checkpoint/state/0 does
exist(which are created by spark).

yes I can attach the log but pretty much it looks like same as I sent on
this thread.

Is there any work around to this for now? Will create a ticket shortly.

Thanks!

Re: Running into the same problem as JIRA SPARK-19268

Posted by "Shixiong(Ryan) Zhu" <sh...@databricks.com>.
Feel free to create a new ticket. Could you also provide the files in
"/usr/local/hadoop/checkpoint/state/0" (Just run "bin/hadoop fs -ls
/usr/local/hadoop/checkpoint/state/0/*") in the ticket and the Spark logs?

On Thu, May 25, 2017 at 2:53 PM, kant kodali <ka...@gmail.com> wrote:

> Should I file a ticket or should I try another version like Spark 2.2
> since I am currently using 2.1.1?
>
> On Thu, May 25, 2017 at 2:38 PM, kant kodali <ka...@gmail.com> wrote:
>
>> Hi Ryan,
>>
>> You are right I was setting checkpointLocation for readStream. Now I did
>> set if for writeStream as well  like below
>>
>> StreamingQuery query = df2.writeStream().foreach(new KafkaSink()).option(
>> "checkpointLocation","/usr/local/hadoop/checkpoint").outputMode("update"
>> ).start();
>>
>> query.awaitTermination();
>>
>> *and now I can at very least see there are directories like*
>>
>> -rw-r--r--   2 ubuntu supergroup         45 2017-05-25 21:29 /usr/local/hadoop/checkpoint/metadata
>> drwxr-xr-x   - ubuntu supergroup          0 2017-05-25 21:30 /usr/local/hadoop/checkpoint/offsets
>> drwxr-xr-x   - ubuntu supergroup          0 2017-05-25 21:29 /usr/local/hadoop/checkpoint/sources
>> drwxr-xr-x   - ubuntu supergroup          0 2017-05-25 21:30 /usr/local/hadoop/checkpoint/state
>>
>>
>> However it still fails with
>>
>> *org.apache.hadoop.ipc.RemoteException(java.io <http://java.io>.FileNotFoundException): File does not exist: /usr/local/hadoop/checkpoint/state/0/1/1.delta*
>>
>>
>>
>> On Thu, May 25, 2017 at 2:31 PM, Shixiong(Ryan) Zhu <
>> shixiong@databricks.com> wrote:
>>
>>> Read your codes again and found one issue: you set "checkpointLocation"
>>> in `readStream`. It should be set in `writeStream`. However, I still have
>>> no idea why use a temp checkpoint location will fail.
>>>
>>> On Thu, May 25, 2017 at 2:23 PM, kant kodali <ka...@gmail.com> wrote:
>>>
>>>> I did the following
>>>>
>>>> *bin/hadoop fs -mkdir -p **/usr/local/hadoop/checkpoint* and did *bin/hadoop
>>>> fs -ls / *
>>>>
>>>> and I can actually see */tmp* and */usr* and inside of */usr *there is
>>>> indeed *local/hadoop/checkpoint. *
>>>>
>>>> So until here it looks fine.
>>>>
>>>> I also cleared everything */tmp/** as @Michael suggested using *bin/hadoop
>>>> fs -rmdir * such that when I do *bin/hadoop fs -ls /tmp *I don't see
>>>> anything.
>>>>
>>>> Now I ran my spark driver program using spark-submit it failed with the
>>>> following exception
>>>>
>>>> *File does not exist:
>>>> /tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2/1.delta*
>>>>
>>>> so I did *bin/hadoop fs -ls *
>>>> */tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2 *
>>>>
>>>> and I did not see anything there like *1.delta(there are just no
>>>> files)* however all these directories */tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2
>>>>  *do exist.
>>>>
>>>> For my checkPointLocation I had passed   */usr/local/hadoop/checkpoint
>>>> *and  *hdfs://<namenode:port>/usr/local/hadoop/checkpoint  *so far and
>>>> both didn't work for me. It is failing with the same error "*File does
>>>> not exist:
>>>> /tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2/1.delta*"
>>>>
>>>> so what can be the problem? any ideas?
>>>>
>>>> Thanks!
>>>>
>>>> On Thu, May 25, 2017 at 1:31 AM, kant kodali <ka...@gmail.com>
>>>> wrote:
>>>>
>>>>> Executing this bin/hadoop fs -ls /usr/local/hadoop/checkpoint says
>>>>>
>>>>> ls: `/usr/local/hadoop/checkpoint': No such file or directory
>>>>>
>>>>> This is what I expected as well since I don't see any checkpoint
>>>>> directory under /usr/local/hadoop. Am I missing any configuration variable
>>>>> like HADOOP_CONF_DIR ? I am currently not setting that in
>>>>> conf/spark-env.sh and thats the only hadoop related environment variable I
>>>>> see. please let me know
>>>>>
>>>>> thanks!
>>>>>
>>>>>
>>>>>
>>>>> On Thu, May 25, 2017 at 1:19 AM, kant kodali <ka...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Ryan,
>>>>>>
>>>>>> I did add that print statement and here is what I got.
>>>>>>
>>>>>> class org.apache.hadoop.hdfs.DistributedFileSystem
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>> On Wed, May 24, 2017 at 11:39 PM, Shixiong(Ryan) Zhu <
>>>>>> shixiong@databricks.com> wrote:
>>>>>>
>>>>>>> I meant using HDFS command to check the directory. Such as
>>>>>>> "bin/hadoop fs -ls /usr/local/hadoop/checkpoint". My hunch is the default
>>>>>>> file system in driver probably is the local file system. Could you add the
>>>>>>> following line into your code to print the default file system?
>>>>>>>
>>>>>>> println(org.apache.hadoop.fs.FileSystem.get(sc.hadoopConfigu
>>>>>>> ration).getClass)
>>>>>>>
>>>>>>> On Wed, May 24, 2017 at 5:59 PM, kant kodali <ka...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi All,
>>>>>>>>
>>>>>>>> I specified hdfsCheckPointDir = /usr/local/hadoop/checkpoint as
>>>>>>>> you can see below however I dont see checkpoint directory under my
>>>>>>>> hadoop_home=/usr/local/hadoop in either datanodes or namenodes
>>>>>>>> however in datanode machine there seems to be some data under
>>>>>>>>
>>>>>>>> /usr/local/hadoop/hdfs/namenode/current/BP-1469808024-X.X.X.
>>>>>>>> X-1495672725898/current/finalized/subdir0/subdir0
>>>>>>>>
>>>>>>>> I thought the checkpoint directory will be created by spark once I
>>>>>>>> specify the path but do I manually need to create checkpoint dir using
>>>>>>>> mkdir in all spark worker machines? I am new to HDFS as well so please let
>>>>>>>> me know. I can try sending df.explain("true") but I have 100 fields in my
>>>>>>>> schema so Project looks really big and if this is not a problem for you
>>>>>>>> guys I can send that as well.
>>>>>>>>
>>>>>>>>
>>>>>>>>   +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@21002393,kafka,List(),None,List(),None,Map(subscribe -> analytics2, failOnDataLoss -> false, kafka.bootstrap.servers -> X.X.X.X:9092 <http://172.31.41.30:9092>, checkpointLocation -> /usr/local/hadoop/checkpoint, startingOffsets -> earliest),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
>>>>>>>>
>>>>>>>> *Here is the stack trace*
>>>>>>>>
>>>>>>>> StructField(OptionalContext4,StringType,true), StructField(OptionalContext5,StringType,true)),true), cast(value#1 as string)) AS payload#15]
>>>>>>>>             +- StreamingExecutionRelation KafkaSource[Subscribe[analytics2]], [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
>>>>>>>>
>>>>>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:305)
>>>>>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:191)
>>>>>>>> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 3.0 failed 4 times, most recent failure: Lost task 2.3 in stage 3.0 (TID 222, 172.31.25.189, executor 0): java.lang.IllegalStateException: Error reading delta file /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta of HDFSStateStoreProvider[id = (op=0, part=2), dir = /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2]: /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta does not exist
>>>>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:365)
>>>>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:317)
>>>>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:314)
>>>>>>>>     at scala.Option.getOrElse(Option.scala:121)
>>>>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:314)
>>>>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:313)
>>>>>>>>     at scala.Option.getOrElse(Option.scala:121)
>>>>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:313)
>>>>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvider.scala:220)
>>>>>>>>     at org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:186)
>>>>>>>>     at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:61)
>>>>>>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>>>>>>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>>>>>>>>     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>>>>>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>>>>>>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>>>>>>>>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>>>>>>>>     at org.apache.spark.scheduler.Task.run(Task.scala:99)
>>>>>>>>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
>>>>>>>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>>>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>>>>     at java.lang.Thread.run(Thread.java:745)
>>>>>>>> Caused by: java.io.FileNotFoundException: File does not exist: /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta
>>>>>>>>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
>>>>>>>>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
>>>>>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1828)
>>>>>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1799)
>>>>>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1712)
>>>>>>>>     at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:588)
>>>>>>>>     at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365)
>>>>>>>>     at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>>>>>>>>     at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
>>>>>>>>     at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>>>>>>>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>>>>>>>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
>>>>>>>>     at java.security.AccessController.doPrivileged(Native Method)
>>>>>>>>     at javax.security.auth.Subject.doAs(Subject.java:422)
>>>>>>>>     at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
>>>>>>>>     at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
>>>>>>>>
>>>>>>>>     at sun.reflect.GeneratedConstructorAccessor13.newInstance(Unknown Source)
>>>>>>>>     at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>>>>>>>     at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>>>>>>>>     at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
>>>>>>>>     at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
>>>>>>>>     at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1228)
>>>>>>>>     at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1213)
>>>>>>>>     at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1201)
>>>>>>>>     at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:306)
>>>>>>>>     at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:272)
>>>>>>>>     at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:264)
>>>>>>>>     at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1526)
>>>>>>>>     at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304)
>>>>>>>>     at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:299)
>>>>>>>>     at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>>>>>>>>     at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:312)
>>>>>>>>     at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
>>>>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:362)
>>>>>>>>     ... 21 more
>>>>>>>> Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta
>>>>>>>>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
>>>>>>>>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
>>>>>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1828)
>>>>>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1799)
>>>>>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1712)
>>>>>>>>     at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:588)
>>>>>>>>     at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365)
>>>>>>>>     at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>>>>>>>>     at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
>>>>>>>>     at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>>>>>>>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>>>>>>>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
>>>>>>>>     at java.security.AccessController.doPrivileged(Native Method)
>>>>>>>>     at javax.security.auth.Subject.doAs(Subject.java:422)
>>>>>>>>     at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
>>>>>>>>     at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
>>>>>>>>
>>>>>>>>     at org.apache.hadoop.ipc.Client.call(Client.java:1475)
>>>>>>>>     at org.apache.hadoop.ipc.Client.call(Client.java:1412)
>>>>>>>>     at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
>>>>>>>>     at com.sun.proxy.$Proxy15.getBlockLocations(Unknown Source)
>>>>>>>>     at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:255)
>>>>>>>>     at sun.reflect.GeneratedMethodAccessor38.invoke(Unknown Source)
>>>>>>>>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>>>     at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
>>>>>>>>     at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>>>>>>>>     at com.sun.proxy.$Proxy16.getBlockLocations(Unknown Source)
>>>>>>>>     at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1226)
>>>>>>>>     ... 33 more
>>>>>>>>
>>>>>>>> Driver stacktrace:
>>>>>>>>     at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
>>>>>>>>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
>>>>>>>>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
>>>>>>>>     at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>>>>>>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>>>>>>>>     at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
>>>>>>>>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
>>>>>>>>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
>>>>>>>>     at scala.Option.foreach(Option.scala:257)
>>>>>>>>     at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
>>>>>>>>     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
>>>>>>>>     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
>>>>>>>>     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
>>>>>>>>     at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>>>>>     at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
>>>>>>>>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925)
>>>>>>>>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938)
>>>>>>>>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951)
>>>>>>>>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1965)
>>>>>>>>     at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:926)
>>>>>>>>     at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:924)
>>>>>>>>     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>>>>>>>     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>>>>>>>>     at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>>>>>>>>     at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:924)
>>>>>>>>     at org.apache.spark.sql.execution.streaming.ForeachSink.addBatch(ForeachSink.scala:49)
>>>>>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply$mcV$sp(StreamExecution.scala:554)
>>>>>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:554)
>>>>>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:554)
>>>>>>>>     at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:278)
>>>>>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:49)
>>>>>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:553)
>>>>>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply$mcV$sp(StreamExecution.scala:273)
>>>>>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:262)
>>>>>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:262)
>>>>>>>>     at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:278)
>>>>>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:49)
>>>>>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:262)
>>>>>>>>     at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43)
>>>>>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:257)
>>>>>>>>     ... 1 more
>>>>>>>> Caused by: java.lang.IllegalStateException: Error reading delta file /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta of HDFSStateStoreProvider[id = (op=0, part=2), dir = /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2]: /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta does not exist
>>>>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:365)
>>>>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:317)
>>>>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:314)
>>>>>>>>     at scala.Option.getOrElse(Option.scala:121)
>>>>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:314)
>>>>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:313)
>>>>>>>>     at scala.Option.getOrElse(Option.scala:121)
>>>>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:313)
>>>>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvider.scala:220)
>>>>>>>>     at org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:186)
>>>>>>>>     at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:61)
>>>>>>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>>>>>>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>>>>>>>>     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>>>>>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>>>>>>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>>>>>>>>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>>>>>>>>     at org.apache.spark.scheduler.Task.run(Task.scala:99)
>>>>>>>>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
>>>>>>>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>>>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>>>>     at java.lang.Thread.run(Thread.java:745)
>>>>>>>> Caused by: java.io.FileNotFoundException: File does not exist: /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta
>>>>>>>>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
>>>>>>>>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
>>>>>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1828)
>>>>>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1799)
>>>>>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1712)
>>>>>>>>     at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:588)
>>>>>>>>     at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365)
>>>>>>>>     at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>>>>>>>>     at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
>>>>>>>>     at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>>>>>>>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>>>>>>>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
>>>>>>>>     at java.security.AccessController.doPrivileged(Native Method)
>>>>>>>>     at javax.security.auth.Subject.doAs(Subject.java:422)
>>>>>>>>     at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
>>>>>>>>     at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
>>>>>>>>
>>>>>>>>     at sun.reflect.GeneratedConstructorAccessor13.newInstance(Unknown Source)
>>>>>>>>     at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>>>>>>>     at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>>>>>>>>     at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
>>>>>>>>     at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
>>>>>>>>     at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1228)
>>>>>>>>     at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1213)
>>>>>>>>     at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1201)
>>>>>>>>     at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:306)
>>>>>>>>     at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:272)
>>>>>>>>     at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:264)
>>>>>>>>     at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1526)
>>>>>>>>     at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304)
>>>>>>>>     at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:299)
>>>>>>>>     at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>>>>>>>>     at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:312)
>>>>>>>>     at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
>>>>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:362)
>>>>>>>>     ... 21 more
>>>>>>>> Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta
>>>>>>>>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
>>>>>>>>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
>>>>>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1828)
>>>>>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1799)
>>>>>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1712)
>>>>>>>>     at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:588)
>>>>>>>>     at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365)
>>>>>>>>     at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>>>>>>>>     at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
>>>>>>>>     at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>>>>>>>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>>>>>>>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
>>>>>>>>     at java.security.AccessController.doPrivileged(Native Method)
>>>>>>>>     at javax.security.auth.Subject.doAs(Subject.java:422)
>>>>>>>>     at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
>>>>>>>>     at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
>>>>>>>>
>>>>>>>>     at org.apache.hadoop.ipc.Client.call(Client.java:1475)
>>>>>>>>     at org.apache.hadoop.ipc.Client.call(Client.java:1412)
>>>>>>>>     at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
>>>>>>>>     at com.sun.proxy.$Proxy15.getBlockLocations(Unknown Source)
>>>>>>>>     at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:255)
>>>>>>>>     at sun.reflect.GeneratedMethodAccessor38.invoke(Unknown Source)
>>>>>>>>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>>>     at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
>>>>>>>>     at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>>>>>>>>     at com.sun.proxy.$Proxy16.getBlockLocations(Unknown Source)
>>>>>>>>     at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1226)
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, May 24, 2017 at 4:29 PM, Shixiong(Ryan) Zhu <
>>>>>>>> shixiong@databricks.com> wrote:
>>>>>>>>
>>>>>>>>> What's the value of "hdfsCheckPointDir"? Could you list this
>>>>>>>>> directory on HDFS and report the files there?
>>>>>>>>>
>>>>>>>>> On Wed, May 24, 2017 at 3:50 PM, Michael Armbrust <
>>>>>>>>> michael@databricks.com> wrote:
>>>>>>>>>
>>>>>>>>>> -dev
>>>>>>>>>>
>>>>>>>>>> Have you tried clearing out the checkpoint directory?  Can you
>>>>>>>>>> also give the full stack trace?
>>>>>>>>>>
>>>>>>>>>> On Wed, May 24, 2017 at 3:45 PM, kant kodali <ka...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Even if I do simple count aggregation like below I get the same
>>>>>>>>>>> error as https://issues.apache.org/jira/browse/SPARK-19268
>>>>>>>>>>>
>>>>>>>>>>> Dataset<Row> df2 = df1.groupBy(functions.window(df1.col("Timestamp5"), "24 hours", "24 hours"), df1.col("AppName")).count();
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, May 24, 2017 at 3:35 PM, kant kodali <kanth909@gmail.com
>>>>>>>>>>> > wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi All,
>>>>>>>>>>>>
>>>>>>>>>>>> I am using Spark 2.1.1 and running in a Standalone mode using
>>>>>>>>>>>> HDFS and Kafka
>>>>>>>>>>>>
>>>>>>>>>>>> I am running into the same problem as
>>>>>>>>>>>> https://issues.apache.org/jira/browse/SPARK-19268 with my
>>>>>>>>>>>> app(not KafkaWordCount).
>>>>>>>>>>>>
>>>>>>>>>>>> Here is my sample code
>>>>>>>>>>>>
>>>>>>>>>>>> *Here is how I create ReadStream*
>>>>>>>>>>>>
>>>>>>>>>>>> sparkSession.readStream()
>>>>>>>>>>>>                 .format("kafka")
>>>>>>>>>>>>                 .option("kafka.bootstrap.servers", config.getString("kafka.consumer.settings.bootstrapServers"))
>>>>>>>>>>>>                 .option("subscribe", config.getString("kafka.consumer.settings.topicName"))
>>>>>>>>>>>>                 .option("startingOffsets", "earliest")
>>>>>>>>>>>>                 .option("failOnDataLoss", "false")
>>>>>>>>>>>>                 .option("checkpointLocation", hdfsCheckPointDir)
>>>>>>>>>>>>                 .load();
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> *The core logic*
>>>>>>>>>>>>
>>>>>>>>>>>> Dataset<Row> df = ds.select(from_json(new Column("value").cast("string"), client.getSchema()).as("payload"));
>>>>>>>>>>>> Dataset<Row> df1 = df.selectExpr("payload.info.*", "payload.data.*");
>>>>>>>>>>>> Dataset<Row> df2 = df1.groupBy(window(df1.col("Timestamp5"), "24 hours", "24 hours"), df1.col("AppName")).agg(sum("Amount"));
>>>>>>>>>>>> StreamingQuery query = df1.writeStream().foreach(new KafkaSink()).outputMode("update").start();
>>>>>>>>>>>> query.awaitTermination();
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> I can also provide any other information you may need.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Running into the same problem as JIRA SPARK-19268

Posted by kant kodali <ka...@gmail.com>.
Should I file a ticket or should I try another version like Spark 2.2 since
I am currently using 2.1.1?

On Thu, May 25, 2017 at 2:38 PM, kant kodali <ka...@gmail.com> wrote:

> Hi Ryan,
>
> You are right I was setting checkpointLocation for readStream. Now I did
> set if for writeStream as well  like below
>
> StreamingQuery query = df2.writeStream().foreach(new KafkaSink()).option("
> checkpointLocation","/usr/local/hadoop/checkpoint").outputMode("update"
> ).start();
>
> query.awaitTermination();
>
> *and now I can at very least see there are directories like*
>
> -rw-r--r--   2 ubuntu supergroup         45 2017-05-25 21:29 /usr/local/hadoop/checkpoint/metadata
> drwxr-xr-x   - ubuntu supergroup          0 2017-05-25 21:30 /usr/local/hadoop/checkpoint/offsets
> drwxr-xr-x   - ubuntu supergroup          0 2017-05-25 21:29 /usr/local/hadoop/checkpoint/sources
> drwxr-xr-x   - ubuntu supergroup          0 2017-05-25 21:30 /usr/local/hadoop/checkpoint/state
>
>
> However it still fails with
>
> *org.apache.hadoop.ipc.RemoteException(java.io <http://java.io>.FileNotFoundException): File does not exist: /usr/local/hadoop/checkpoint/state/0/1/1.delta*
>
>
>
> On Thu, May 25, 2017 at 2:31 PM, Shixiong(Ryan) Zhu <
> shixiong@databricks.com> wrote:
>
>> Read your codes again and found one issue: you set "checkpointLocation"
>> in `readStream`. It should be set in `writeStream`. However, I still have
>> no idea why use a temp checkpoint location will fail.
>>
>> On Thu, May 25, 2017 at 2:23 PM, kant kodali <ka...@gmail.com> wrote:
>>
>>> I did the following
>>>
>>> *bin/hadoop fs -mkdir -p **/usr/local/hadoop/checkpoint* and did *bin/hadoop
>>> fs -ls / *
>>>
>>> and I can actually see */tmp* and */usr* and inside of */usr *there is
>>> indeed *local/hadoop/checkpoint. *
>>>
>>> So until here it looks fine.
>>>
>>> I also cleared everything */tmp/** as @Michael suggested using *bin/hadoop
>>> fs -rmdir * such that when I do *bin/hadoop fs -ls /tmp *I don't see
>>> anything.
>>>
>>> Now I ran my spark driver program using spark-submit it failed with the
>>> following exception
>>>
>>> *File does not exist:
>>> /tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2/1.delta*
>>>
>>> so I did *bin/hadoop fs -ls *
>>> */tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2 *
>>>
>>> and I did not see anything there like *1.delta(there are just no files)* however
>>> all these directories */tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2
>>>  *do exist.
>>>
>>> For my checkPointLocation I had passed   */usr/local/hadoop/checkpoint *
>>> and  *hdfs://<namenode:port>/usr/local/hadoop/checkpoint  *so far and
>>> both didn't work for me. It is failing with the same error "*File does
>>> not exist:
>>> /tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2/1.delta*"
>>>
>>> so what can be the problem? any ideas?
>>>
>>> Thanks!
>>>
>>> On Thu, May 25, 2017 at 1:31 AM, kant kodali <ka...@gmail.com> wrote:
>>>
>>>> Executing this bin/hadoop fs -ls /usr/local/hadoop/checkpoint says
>>>>
>>>> ls: `/usr/local/hadoop/checkpoint': No such file or directory
>>>>
>>>> This is what I expected as well since I don't see any checkpoint
>>>> directory under /usr/local/hadoop. Am I missing any configuration variable
>>>> like HADOOP_CONF_DIR ? I am currently not setting that in
>>>> conf/spark-env.sh and thats the only hadoop related environment variable I
>>>> see. please let me know
>>>>
>>>> thanks!
>>>>
>>>>
>>>>
>>>> On Thu, May 25, 2017 at 1:19 AM, kant kodali <ka...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Ryan,
>>>>>
>>>>> I did add that print statement and here is what I got.
>>>>>
>>>>> class org.apache.hadoop.hdfs.DistributedFileSystem
>>>>>
>>>>> Thanks!
>>>>>
>>>>> On Wed, May 24, 2017 at 11:39 PM, Shixiong(Ryan) Zhu <
>>>>> shixiong@databricks.com> wrote:
>>>>>
>>>>>> I meant using HDFS command to check the directory. Such as
>>>>>> "bin/hadoop fs -ls /usr/local/hadoop/checkpoint". My hunch is the default
>>>>>> file system in driver probably is the local file system. Could you add the
>>>>>> following line into your code to print the default file system?
>>>>>>
>>>>>> println(org.apache.hadoop.fs.FileSystem.get(sc.hadoopConfigu
>>>>>> ration).getClass)
>>>>>>
>>>>>> On Wed, May 24, 2017 at 5:59 PM, kant kodali <ka...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi All,
>>>>>>>
>>>>>>> I specified hdfsCheckPointDir = /usr/local/hadoop/checkpoint as you
>>>>>>> can see below however I dont see checkpoint directory under my hadoop_home=
>>>>>>> /usr/local/hadoop in either datanodes or namenodes however in
>>>>>>> datanode machine there seems to be some data under
>>>>>>>
>>>>>>> /usr/local/hadoop/hdfs/namenode/current/BP-1469808024-X.X.X.
>>>>>>> X-1495672725898/current/finalized/subdir0/subdir0
>>>>>>>
>>>>>>> I thought the checkpoint directory will be created by spark once I
>>>>>>> specify the path but do I manually need to create checkpoint dir using
>>>>>>> mkdir in all spark worker machines? I am new to HDFS as well so please let
>>>>>>> me know. I can try sending df.explain("true") but I have 100 fields in my
>>>>>>> schema so Project looks really big and if this is not a problem for you
>>>>>>> guys I can send that as well.
>>>>>>>
>>>>>>>
>>>>>>>   +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@21002393,kafka,List(),None,List(),None,Map(subscribe -> analytics2, failOnDataLoss -> false, kafka.bootstrap.servers -> X.X.X.X:9092 <http://172.31.41.30:9092>, checkpointLocation -> /usr/local/hadoop/checkpoint, startingOffsets -> earliest),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
>>>>>>>
>>>>>>> *Here is the stack trace*
>>>>>>>
>>>>>>> StructField(OptionalContext4,StringType,true), StructField(OptionalContext5,StringType,true)),true), cast(value#1 as string)) AS payload#15]
>>>>>>>             +- StreamingExecutionRelation KafkaSource[Subscribe[analytics2]], [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
>>>>>>>
>>>>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:305)
>>>>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:191)
>>>>>>> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 3.0 failed 4 times, most recent failure: Lost task 2.3 in stage 3.0 (TID 222, 172.31.25.189, executor 0): java.lang.IllegalStateException: Error reading delta file /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta of HDFSStateStoreProvider[id = (op=0, part=2), dir = /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2]: /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta does not exist
>>>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:365)
>>>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:317)
>>>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:314)
>>>>>>>     at scala.Option.getOrElse(Option.scala:121)
>>>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:314)
>>>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:313)
>>>>>>>     at scala.Option.getOrElse(Option.scala:121)
>>>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:313)
>>>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvider.scala:220)
>>>>>>>     at org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:186)
>>>>>>>     at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:61)
>>>>>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>>>>>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>>>>>>>     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>>>>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>>>>>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>>>>>>>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>>>>>>>     at org.apache.spark.scheduler.Task.run(Task.scala:99)
>>>>>>>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
>>>>>>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>>>     at java.lang.Thread.run(Thread.java:745)
>>>>>>> Caused by: java.io.FileNotFoundException: File does not exist: /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta
>>>>>>>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
>>>>>>>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
>>>>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1828)
>>>>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1799)
>>>>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1712)
>>>>>>>     at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:588)
>>>>>>>     at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365)
>>>>>>>     at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>>>>>>>     at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
>>>>>>>     at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>>>>>>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>>>>>>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
>>>>>>>     at java.security.AccessController.doPrivileged(Native Method)
>>>>>>>     at javax.security.auth.Subject.doAs(Subject.java:422)
>>>>>>>     at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
>>>>>>>     at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
>>>>>>>
>>>>>>>     at sun.reflect.GeneratedConstructorAccessor13.newInstance(Unknown Source)
>>>>>>>     at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>>>>>>     at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>>>>>>>     at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
>>>>>>>     at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
>>>>>>>     at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1228)
>>>>>>>     at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1213)
>>>>>>>     at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1201)
>>>>>>>     at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:306)
>>>>>>>     at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:272)
>>>>>>>     at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:264)
>>>>>>>     at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1526)
>>>>>>>     at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304)
>>>>>>>     at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:299)
>>>>>>>     at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>>>>>>>     at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:312)
>>>>>>>     at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
>>>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:362)
>>>>>>>     ... 21 more
>>>>>>> Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta
>>>>>>>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
>>>>>>>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
>>>>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1828)
>>>>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1799)
>>>>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1712)
>>>>>>>     at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:588)
>>>>>>>     at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365)
>>>>>>>     at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>>>>>>>     at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
>>>>>>>     at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>>>>>>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>>>>>>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
>>>>>>>     at java.security.AccessController.doPrivileged(Native Method)
>>>>>>>     at javax.security.auth.Subject.doAs(Subject.java:422)
>>>>>>>     at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
>>>>>>>     at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
>>>>>>>
>>>>>>>     at org.apache.hadoop.ipc.Client.call(Client.java:1475)
>>>>>>>     at org.apache.hadoop.ipc.Client.call(Client.java:1412)
>>>>>>>     at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
>>>>>>>     at com.sun.proxy.$Proxy15.getBlockLocations(Unknown Source)
>>>>>>>     at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:255)
>>>>>>>     at sun.reflect.GeneratedMethodAccessor38.invoke(Unknown Source)
>>>>>>>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>>     at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
>>>>>>>     at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>>>>>>>     at com.sun.proxy.$Proxy16.getBlockLocations(Unknown Source)
>>>>>>>     at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1226)
>>>>>>>     ... 33 more
>>>>>>>
>>>>>>> Driver stacktrace:
>>>>>>>     at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
>>>>>>>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
>>>>>>>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
>>>>>>>     at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>>>>>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>>>>>>>     at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
>>>>>>>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
>>>>>>>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
>>>>>>>     at scala.Option.foreach(Option.scala:257)
>>>>>>>     at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
>>>>>>>     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
>>>>>>>     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
>>>>>>>     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
>>>>>>>     at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>>>>     at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
>>>>>>>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925)
>>>>>>>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938)
>>>>>>>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951)
>>>>>>>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1965)
>>>>>>>     at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:926)
>>>>>>>     at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:924)
>>>>>>>     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>>>>>>     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>>>>>>>     at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>>>>>>>     at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:924)
>>>>>>>     at org.apache.spark.sql.execution.streaming.ForeachSink.addBatch(ForeachSink.scala:49)
>>>>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply$mcV$sp(StreamExecution.scala:554)
>>>>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:554)
>>>>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:554)
>>>>>>>     at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:278)
>>>>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:49)
>>>>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:553)
>>>>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply$mcV$sp(StreamExecution.scala:273)
>>>>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:262)
>>>>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:262)
>>>>>>>     at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:278)
>>>>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:49)
>>>>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:262)
>>>>>>>     at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43)
>>>>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:257)
>>>>>>>     ... 1 more
>>>>>>> Caused by: java.lang.IllegalStateException: Error reading delta file /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta of HDFSStateStoreProvider[id = (op=0, part=2), dir = /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2]: /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta does not exist
>>>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:365)
>>>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:317)
>>>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:314)
>>>>>>>     at scala.Option.getOrElse(Option.scala:121)
>>>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:314)
>>>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:313)
>>>>>>>     at scala.Option.getOrElse(Option.scala:121)
>>>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:313)
>>>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvider.scala:220)
>>>>>>>     at org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:186)
>>>>>>>     at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:61)
>>>>>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>>>>>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>>>>>>>     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>>>>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>>>>>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>>>>>>>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>>>>>>>     at org.apache.spark.scheduler.Task.run(Task.scala:99)
>>>>>>>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
>>>>>>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>>>     at java.lang.Thread.run(Thread.java:745)
>>>>>>> Caused by: java.io.FileNotFoundException: File does not exist: /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta
>>>>>>>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
>>>>>>>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
>>>>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1828)
>>>>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1799)
>>>>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1712)
>>>>>>>     at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:588)
>>>>>>>     at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365)
>>>>>>>     at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>>>>>>>     at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
>>>>>>>     at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>>>>>>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>>>>>>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
>>>>>>>     at java.security.AccessController.doPrivileged(Native Method)
>>>>>>>     at javax.security.auth.Subject.doAs(Subject.java:422)
>>>>>>>     at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
>>>>>>>     at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
>>>>>>>
>>>>>>>     at sun.reflect.GeneratedConstructorAccessor13.newInstance(Unknown Source)
>>>>>>>     at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>>>>>>     at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>>>>>>>     at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
>>>>>>>     at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
>>>>>>>     at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1228)
>>>>>>>     at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1213)
>>>>>>>     at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1201)
>>>>>>>     at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:306)
>>>>>>>     at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:272)
>>>>>>>     at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:264)
>>>>>>>     at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1526)
>>>>>>>     at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304)
>>>>>>>     at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:299)
>>>>>>>     at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>>>>>>>     at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:312)
>>>>>>>     at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
>>>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:362)
>>>>>>>     ... 21 more
>>>>>>> Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta
>>>>>>>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
>>>>>>>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
>>>>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1828)
>>>>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1799)
>>>>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1712)
>>>>>>>     at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:588)
>>>>>>>     at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365)
>>>>>>>     at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>>>>>>>     at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
>>>>>>>     at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>>>>>>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>>>>>>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
>>>>>>>     at java.security.AccessController.doPrivileged(Native Method)
>>>>>>>     at javax.security.auth.Subject.doAs(Subject.java:422)
>>>>>>>     at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
>>>>>>>     at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
>>>>>>>
>>>>>>>     at org.apache.hadoop.ipc.Client.call(Client.java:1475)
>>>>>>>     at org.apache.hadoop.ipc.Client.call(Client.java:1412)
>>>>>>>     at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
>>>>>>>     at com.sun.proxy.$Proxy15.getBlockLocations(Unknown Source)
>>>>>>>     at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:255)
>>>>>>>     at sun.reflect.GeneratedMethodAccessor38.invoke(Unknown Source)
>>>>>>>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>>     at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
>>>>>>>     at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>>>>>>>     at com.sun.proxy.$Proxy16.getBlockLocations(Unknown Source)
>>>>>>>     at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1226)
>>>>>>>
>>>>>>>
>>>>>>> On Wed, May 24, 2017 at 4:29 PM, Shixiong(Ryan) Zhu <
>>>>>>> shixiong@databricks.com> wrote:
>>>>>>>
>>>>>>>> What's the value of "hdfsCheckPointDir"? Could you list this
>>>>>>>> directory on HDFS and report the files there?
>>>>>>>>
>>>>>>>> On Wed, May 24, 2017 at 3:50 PM, Michael Armbrust <
>>>>>>>> michael@databricks.com> wrote:
>>>>>>>>
>>>>>>>>> -dev
>>>>>>>>>
>>>>>>>>> Have you tried clearing out the checkpoint directory?  Can you
>>>>>>>>> also give the full stack trace?
>>>>>>>>>
>>>>>>>>> On Wed, May 24, 2017 at 3:45 PM, kant kodali <ka...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Even if I do simple count aggregation like below I get the same
>>>>>>>>>> error as https://issues.apache.org/jira/browse/SPARK-19268
>>>>>>>>>>
>>>>>>>>>> Dataset<Row> df2 = df1.groupBy(functions.window(df1.col("Timestamp5"), "24 hours", "24 hours"), df1.col("AppName")).count();
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, May 24, 2017 at 3:35 PM, kant kodali <ka...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi All,
>>>>>>>>>>>
>>>>>>>>>>> I am using Spark 2.1.1 and running in a Standalone mode using
>>>>>>>>>>> HDFS and Kafka
>>>>>>>>>>>
>>>>>>>>>>> I am running into the same problem as
>>>>>>>>>>> https://issues.apache.org/jira/browse/SPARK-19268 with my
>>>>>>>>>>> app(not KafkaWordCount).
>>>>>>>>>>>
>>>>>>>>>>> Here is my sample code
>>>>>>>>>>>
>>>>>>>>>>> *Here is how I create ReadStream*
>>>>>>>>>>>
>>>>>>>>>>> sparkSession.readStream()
>>>>>>>>>>>                 .format("kafka")
>>>>>>>>>>>                 .option("kafka.bootstrap.servers", config.getString("kafka.consumer.settings.bootstrapServers"))
>>>>>>>>>>>                 .option("subscribe", config.getString("kafka.consumer.settings.topicName"))
>>>>>>>>>>>                 .option("startingOffsets", "earliest")
>>>>>>>>>>>                 .option("failOnDataLoss", "false")
>>>>>>>>>>>                 .option("checkpointLocation", hdfsCheckPointDir)
>>>>>>>>>>>                 .load();
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> *The core logic*
>>>>>>>>>>>
>>>>>>>>>>> Dataset<Row> df = ds.select(from_json(new Column("value").cast("string"), client.getSchema()).as("payload"));
>>>>>>>>>>> Dataset<Row> df1 = df.selectExpr("payload.info.*", "payload.data.*");
>>>>>>>>>>> Dataset<Row> df2 = df1.groupBy(window(df1.col("Timestamp5"), "24 hours", "24 hours"), df1.col("AppName")).agg(sum("Amount"));
>>>>>>>>>>> StreamingQuery query = df1.writeStream().foreach(new KafkaSink()).outputMode("update").start();
>>>>>>>>>>> query.awaitTermination();
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> I can also provide any other information you may need.
>>>>>>>>>>>
>>>>>>>>>>> Thanks!
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Running into the same problem as JIRA SPARK-19268

Posted by kant kodali <ka...@gmail.com>.
Hi Ryan,

You are right I was setting checkpointLocation for readStream. Now I did
set if for writeStream as well  like below

StreamingQuery query = df2.writeStream().foreach(new KafkaSink()).option(
"checkpointLocation","/usr/local/hadoop/checkpoint").outputMode("update"
).start();

query.awaitTermination();

*and now I can at very least see there are directories like*

-rw-r--r--   2 ubuntu supergroup         45 2017-05-25 21:29
/usr/local/hadoop/checkpoint/metadata
drwxr-xr-x   - ubuntu supergroup          0 2017-05-25 21:30
/usr/local/hadoop/checkpoint/offsets
drwxr-xr-x   - ubuntu supergroup          0 2017-05-25 21:29
/usr/local/hadoop/checkpoint/sources
drwxr-xr-x   - ubuntu supergroup          0 2017-05-25 21:30
/usr/local/hadoop/checkpoint/state


However it still fails with

*org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException):
File does not exist: /usr/local/hadoop/checkpoint/state/0/1/1.delta*



On Thu, May 25, 2017 at 2:31 PM, Shixiong(Ryan) Zhu <shixiong@databricks.com
> wrote:

> Read your codes again and found one issue: you set "checkpointLocation" in
> `readStream`. It should be set in `writeStream`. However, I still have no
> idea why use a temp checkpoint location will fail.
>
> On Thu, May 25, 2017 at 2:23 PM, kant kodali <ka...@gmail.com> wrote:
>
>> I did the following
>>
>> *bin/hadoop fs -mkdir -p **/usr/local/hadoop/checkpoint* and did *bin/hadoop
>> fs -ls / *
>>
>> and I can actually see */tmp* and */usr* and inside of */usr *there is
>> indeed *local/hadoop/checkpoint. *
>>
>> So until here it looks fine.
>>
>> I also cleared everything */tmp/** as @Michael suggested using *bin/hadoop
>> fs -rmdir * such that when I do *bin/hadoop fs -ls /tmp *I don't see
>> anything.
>>
>> Now I ran my spark driver program using spark-submit it failed with the
>> following exception
>>
>> *File does not exist:
>> /tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2/1.delta*
>>
>> so I did *bin/hadoop fs -ls *
>> */tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2 *
>>
>> and I did not see anything there like *1.delta(there are just no files)* however
>> all these directories */tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2
>>  *do exist.
>>
>> For my checkPointLocation I had passed   */usr/local/hadoop/checkpoint *
>> and  *hdfs://<namenode:port>/usr/local/hadoop/checkpoint  *so far and
>> both didn't work for me. It is failing with the same error "*File does
>> not exist:
>> /tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2/1.delta*"
>>
>> so what can be the problem? any ideas?
>>
>> Thanks!
>>
>> On Thu, May 25, 2017 at 1:31 AM, kant kodali <ka...@gmail.com> wrote:
>>
>>> Executing this bin/hadoop fs -ls /usr/local/hadoop/checkpoint says
>>>
>>> ls: `/usr/local/hadoop/checkpoint': No such file or directory
>>>
>>> This is what I expected as well since I don't see any checkpoint
>>> directory under /usr/local/hadoop. Am I missing any configuration variable
>>> like HADOOP_CONF_DIR ? I am currently not setting that in
>>> conf/spark-env.sh and thats the only hadoop related environment variable I
>>> see. please let me know
>>>
>>> thanks!
>>>
>>>
>>>
>>> On Thu, May 25, 2017 at 1:19 AM, kant kodali <ka...@gmail.com> wrote:
>>>
>>>> Hi Ryan,
>>>>
>>>> I did add that print statement and here is what I got.
>>>>
>>>> class org.apache.hadoop.hdfs.DistributedFileSystem
>>>>
>>>> Thanks!
>>>>
>>>> On Wed, May 24, 2017 at 11:39 PM, Shixiong(Ryan) Zhu <
>>>> shixiong@databricks.com> wrote:
>>>>
>>>>> I meant using HDFS command to check the directory. Such as "bin/hadoop
>>>>> fs -ls /usr/local/hadoop/checkpoint". My hunch is the default file system
>>>>> in driver probably is the local file system. Could you add the following
>>>>> line into your code to print the default file system?
>>>>>
>>>>> println(org.apache.hadoop.fs.FileSystem.get(sc.hadoopConfigu
>>>>> ration).getClass)
>>>>>
>>>>> On Wed, May 24, 2017 at 5:59 PM, kant kodali <ka...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> I specified hdfsCheckPointDir = /usr/local/hadoop/checkpoint as you
>>>>>> can see below however I dont see checkpoint directory under my hadoop_home=
>>>>>> /usr/local/hadoop in either datanodes or namenodes however in
>>>>>> datanode machine there seems to be some data under
>>>>>>
>>>>>> /usr/local/hadoop/hdfs/namenode/current/BP-1469808024-X.X.X.
>>>>>> X-1495672725898/current/finalized/subdir0/subdir0
>>>>>>
>>>>>> I thought the checkpoint directory will be created by spark once I
>>>>>> specify the path but do I manually need to create checkpoint dir using
>>>>>> mkdir in all spark worker machines? I am new to HDFS as well so please let
>>>>>> me know. I can try sending df.explain("true") but I have 100 fields in my
>>>>>> schema so Project looks really big and if this is not a problem for you
>>>>>> guys I can send that as well.
>>>>>>
>>>>>>
>>>>>>   +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@21002393,kafka,List(),None,List(),None,Map(subscribe -> analytics2, failOnDataLoss -> false, kafka.bootstrap.servers -> X.X.X.X:9092 <http://172.31.41.30:9092>, checkpointLocation -> /usr/local/hadoop/checkpoint, startingOffsets -> earliest),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
>>>>>>
>>>>>> *Here is the stack trace*
>>>>>>
>>>>>> StructField(OptionalContext4,StringType,true), StructField(OptionalContext5,StringType,true)),true), cast(value#1 as string)) AS payload#15]
>>>>>>             +- StreamingExecutionRelation KafkaSource[Subscribe[analytics2]], [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
>>>>>>
>>>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:305)
>>>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:191)
>>>>>> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 3.0 failed 4 times, most recent failure: Lost task 2.3 in stage 3.0 (TID 222, 172.31.25.189, executor 0): java.lang.IllegalStateException: Error reading delta file /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta of HDFSStateStoreProvider[id = (op=0, part=2), dir = /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2]: /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta does not exist
>>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:365)
>>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:317)
>>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:314)
>>>>>>     at scala.Option.getOrElse(Option.scala:121)
>>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:314)
>>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:313)
>>>>>>     at scala.Option.getOrElse(Option.scala:121)
>>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:313)
>>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvider.scala:220)
>>>>>>     at org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:186)
>>>>>>     at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:61)
>>>>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>>>>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>>>>>>     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>>>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>>>>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>>>>>>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>>>>>>     at org.apache.spark.scheduler.Task.run(Task.scala:99)
>>>>>>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
>>>>>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>>     at java.lang.Thread.run(Thread.java:745)
>>>>>> Caused by: java.io.FileNotFoundException: File does not exist: /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta
>>>>>>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
>>>>>>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
>>>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1828)
>>>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1799)
>>>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1712)
>>>>>>     at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:588)
>>>>>>     at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365)
>>>>>>     at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>>>>>>     at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
>>>>>>     at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>>>>>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>>>>>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
>>>>>>     at java.security.AccessController.doPrivileged(Native Method)
>>>>>>     at javax.security.auth.Subject.doAs(Subject.java:422)
>>>>>>     at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
>>>>>>     at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
>>>>>>
>>>>>>     at sun.reflect.GeneratedConstructorAccessor13.newInstance(Unknown Source)
>>>>>>     at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>>>>>     at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>>>>>>     at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
>>>>>>     at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
>>>>>>     at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1228)
>>>>>>     at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1213)
>>>>>>     at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1201)
>>>>>>     at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:306)
>>>>>>     at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:272)
>>>>>>     at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:264)
>>>>>>     at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1526)
>>>>>>     at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304)
>>>>>>     at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:299)
>>>>>>     at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>>>>>>     at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:312)
>>>>>>     at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
>>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:362)
>>>>>>     ... 21 more
>>>>>> Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta
>>>>>>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
>>>>>>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
>>>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1828)
>>>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1799)
>>>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1712)
>>>>>>     at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:588)
>>>>>>     at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365)
>>>>>>     at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>>>>>>     at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
>>>>>>     at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>>>>>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>>>>>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
>>>>>>     at java.security.AccessController.doPrivileged(Native Method)
>>>>>>     at javax.security.auth.Subject.doAs(Subject.java:422)
>>>>>>     at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
>>>>>>     at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
>>>>>>
>>>>>>     at org.apache.hadoop.ipc.Client.call(Client.java:1475)
>>>>>>     at org.apache.hadoop.ipc.Client.call(Client.java:1412)
>>>>>>     at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
>>>>>>     at com.sun.proxy.$Proxy15.getBlockLocations(Unknown Source)
>>>>>>     at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:255)
>>>>>>     at sun.reflect.GeneratedMethodAccessor38.invoke(Unknown Source)
>>>>>>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>     at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
>>>>>>     at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>>>>>>     at com.sun.proxy.$Proxy16.getBlockLocations(Unknown Source)
>>>>>>     at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1226)
>>>>>>     ... 33 more
>>>>>>
>>>>>> Driver stacktrace:
>>>>>>     at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
>>>>>>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
>>>>>>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
>>>>>>     at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>>>>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>>>>>>     at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
>>>>>>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
>>>>>>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
>>>>>>     at scala.Option.foreach(Option.scala:257)
>>>>>>     at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
>>>>>>     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
>>>>>>     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
>>>>>>     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
>>>>>>     at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>>>     at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
>>>>>>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925)
>>>>>>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938)
>>>>>>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951)
>>>>>>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1965)
>>>>>>     at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:926)
>>>>>>     at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:924)
>>>>>>     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>>>>>     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>>>>>>     at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>>>>>>     at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:924)
>>>>>>     at org.apache.spark.sql.execution.streaming.ForeachSink.addBatch(ForeachSink.scala:49)
>>>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply$mcV$sp(StreamExecution.scala:554)
>>>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:554)
>>>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:554)
>>>>>>     at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:278)
>>>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:49)
>>>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:553)
>>>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply$mcV$sp(StreamExecution.scala:273)
>>>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:262)
>>>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:262)
>>>>>>     at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:278)
>>>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:49)
>>>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:262)
>>>>>>     at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43)
>>>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:257)
>>>>>>     ... 1 more
>>>>>> Caused by: java.lang.IllegalStateException: Error reading delta file /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta of HDFSStateStoreProvider[id = (op=0, part=2), dir = /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2]: /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta does not exist
>>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:365)
>>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:317)
>>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:314)
>>>>>>     at scala.Option.getOrElse(Option.scala:121)
>>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:314)
>>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:313)
>>>>>>     at scala.Option.getOrElse(Option.scala:121)
>>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:313)
>>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvider.scala:220)
>>>>>>     at org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:186)
>>>>>>     at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:61)
>>>>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>>>>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>>>>>>     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>>>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>>>>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>>>>>>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>>>>>>     at org.apache.spark.scheduler.Task.run(Task.scala:99)
>>>>>>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
>>>>>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>>     at java.lang.Thread.run(Thread.java:745)
>>>>>> Caused by: java.io.FileNotFoundException: File does not exist: /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta
>>>>>>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
>>>>>>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
>>>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1828)
>>>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1799)
>>>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1712)
>>>>>>     at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:588)
>>>>>>     at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365)
>>>>>>     at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>>>>>>     at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
>>>>>>     at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>>>>>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>>>>>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
>>>>>>     at java.security.AccessController.doPrivileged(Native Method)
>>>>>>     at javax.security.auth.Subject.doAs(Subject.java:422)
>>>>>>     at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
>>>>>>     at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
>>>>>>
>>>>>>     at sun.reflect.GeneratedConstructorAccessor13.newInstance(Unknown Source)
>>>>>>     at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>>>>>     at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>>>>>>     at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
>>>>>>     at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
>>>>>>     at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1228)
>>>>>>     at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1213)
>>>>>>     at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1201)
>>>>>>     at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:306)
>>>>>>     at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:272)
>>>>>>     at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:264)
>>>>>>     at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1526)
>>>>>>     at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304)
>>>>>>     at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:299)
>>>>>>     at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>>>>>>     at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:312)
>>>>>>     at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
>>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:362)
>>>>>>     ... 21 more
>>>>>> Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta
>>>>>>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
>>>>>>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
>>>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1828)
>>>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1799)
>>>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1712)
>>>>>>     at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:588)
>>>>>>     at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365)
>>>>>>     at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>>>>>>     at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
>>>>>>     at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>>>>>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>>>>>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
>>>>>>     at java.security.AccessController.doPrivileged(Native Method)
>>>>>>     at javax.security.auth.Subject.doAs(Subject.java:422)
>>>>>>     at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
>>>>>>     at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
>>>>>>
>>>>>>     at org.apache.hadoop.ipc.Client.call(Client.java:1475)
>>>>>>     at org.apache.hadoop.ipc.Client.call(Client.java:1412)
>>>>>>     at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
>>>>>>     at com.sun.proxy.$Proxy15.getBlockLocations(Unknown Source)
>>>>>>     at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:255)
>>>>>>     at sun.reflect.GeneratedMethodAccessor38.invoke(Unknown Source)
>>>>>>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>     at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
>>>>>>     at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>>>>>>     at com.sun.proxy.$Proxy16.getBlockLocations(Unknown Source)
>>>>>>     at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1226)
>>>>>>
>>>>>>
>>>>>> On Wed, May 24, 2017 at 4:29 PM, Shixiong(Ryan) Zhu <
>>>>>> shixiong@databricks.com> wrote:
>>>>>>
>>>>>>> What's the value of "hdfsCheckPointDir"? Could you list this
>>>>>>> directory on HDFS and report the files there?
>>>>>>>
>>>>>>> On Wed, May 24, 2017 at 3:50 PM, Michael Armbrust <
>>>>>>> michael@databricks.com> wrote:
>>>>>>>
>>>>>>>> -dev
>>>>>>>>
>>>>>>>> Have you tried clearing out the checkpoint directory?  Can you also
>>>>>>>> give the full stack trace?
>>>>>>>>
>>>>>>>> On Wed, May 24, 2017 at 3:45 PM, kant kodali <ka...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Even if I do simple count aggregation like below I get the same
>>>>>>>>> error as https://issues.apache.org/jira/browse/SPARK-19268
>>>>>>>>>
>>>>>>>>> Dataset<Row> df2 = df1.groupBy(functions.window(df1.col("Timestamp5"), "24 hours", "24 hours"), df1.col("AppName")).count();
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, May 24, 2017 at 3:35 PM, kant kodali <ka...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi All,
>>>>>>>>>>
>>>>>>>>>> I am using Spark 2.1.1 and running in a Standalone mode using
>>>>>>>>>> HDFS and Kafka
>>>>>>>>>>
>>>>>>>>>> I am running into the same problem as
>>>>>>>>>> https://issues.apache.org/jira/browse/SPARK-19268 with my
>>>>>>>>>> app(not KafkaWordCount).
>>>>>>>>>>
>>>>>>>>>> Here is my sample code
>>>>>>>>>>
>>>>>>>>>> *Here is how I create ReadStream*
>>>>>>>>>>
>>>>>>>>>> sparkSession.readStream()
>>>>>>>>>>                 .format("kafka")
>>>>>>>>>>                 .option("kafka.bootstrap.servers", config.getString("kafka.consumer.settings.bootstrapServers"))
>>>>>>>>>>                 .option("subscribe", config.getString("kafka.consumer.settings.topicName"))
>>>>>>>>>>                 .option("startingOffsets", "earliest")
>>>>>>>>>>                 .option("failOnDataLoss", "false")
>>>>>>>>>>                 .option("checkpointLocation", hdfsCheckPointDir)
>>>>>>>>>>                 .load();
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> *The core logic*
>>>>>>>>>>
>>>>>>>>>> Dataset<Row> df = ds.select(from_json(new Column("value").cast("string"), client.getSchema()).as("payload"));
>>>>>>>>>> Dataset<Row> df1 = df.selectExpr("payload.info.*", "payload.data.*");
>>>>>>>>>> Dataset<Row> df2 = df1.groupBy(window(df1.col("Timestamp5"), "24 hours", "24 hours"), df1.col("AppName")).agg(sum("Amount"));
>>>>>>>>>> StreamingQuery query = df1.writeStream().foreach(new KafkaSink()).outputMode("update").start();
>>>>>>>>>> query.awaitTermination();
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> I can also provide any other information you may need.
>>>>>>>>>>
>>>>>>>>>> Thanks!
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Running into the same problem as JIRA SPARK-19268

Posted by "Shixiong(Ryan) Zhu" <sh...@databricks.com>.
Read your codes again and found one issue: you set "checkpointLocation" in
`readStream`. It should be set in `writeStream`. However, I still have no
idea why use a temp checkpoint location will fail.

On Thu, May 25, 2017 at 2:23 PM, kant kodali <ka...@gmail.com> wrote:

> I did the following
>
> *bin/hadoop fs -mkdir -p **/usr/local/hadoop/checkpoint* and did *bin/hadoop
> fs -ls / *
>
> and I can actually see */tmp* and */usr* and inside of */usr *there is
> indeed *local/hadoop/checkpoint. *
>
> So until here it looks fine.
>
> I also cleared everything */tmp/** as @Michael suggested using *bin/hadoop
> fs -rmdir * such that when I do *bin/hadoop fs -ls /tmp *I don't see
> anything.
>
> Now I ran my spark driver program using spark-submit it failed with the
> following exception
>
> *File does not exist:
> /tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2/1.delta*
>
> so I did *bin/hadoop fs -ls *
> */tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2 *
>
> and I did not see anything there like *1.delta(there are just no files)* however
> all these directories */tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2
>  *do exist.
>
> For my checkPointLocation I had passed   */usr/local/hadoop/checkpoint *
> and  *hdfs://<namenode:port>/usr/local/hadoop/checkpoint  *so far and
> both didn't work for me. It is failing with the same error "*File does
> not exist:
> /tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2/1.delta*"
>
> so what can be the problem? any ideas?
>
> Thanks!
>
> On Thu, May 25, 2017 at 1:31 AM, kant kodali <ka...@gmail.com> wrote:
>
>> Executing this bin/hadoop fs -ls /usr/local/hadoop/checkpoint says
>>
>> ls: `/usr/local/hadoop/checkpoint': No such file or directory
>>
>> This is what I expected as well since I don't see any checkpoint
>> directory under /usr/local/hadoop. Am I missing any configuration variable
>> like HADOOP_CONF_DIR ? I am currently not setting that in
>> conf/spark-env.sh and thats the only hadoop related environment variable I
>> see. please let me know
>>
>> thanks!
>>
>>
>>
>> On Thu, May 25, 2017 at 1:19 AM, kant kodali <ka...@gmail.com> wrote:
>>
>>> Hi Ryan,
>>>
>>> I did add that print statement and here is what I got.
>>>
>>> class org.apache.hadoop.hdfs.DistributedFileSystem
>>>
>>> Thanks!
>>>
>>> On Wed, May 24, 2017 at 11:39 PM, Shixiong(Ryan) Zhu <
>>> shixiong@databricks.com> wrote:
>>>
>>>> I meant using HDFS command to check the directory. Such as "bin/hadoop
>>>> fs -ls /usr/local/hadoop/checkpoint". My hunch is the default file system
>>>> in driver probably is the local file system. Could you add the following
>>>> line into your code to print the default file system?
>>>>
>>>> println(org.apache.hadoop.fs.FileSystem.get(sc.hadoopConfigu
>>>> ration).getClass)
>>>>
>>>> On Wed, May 24, 2017 at 5:59 PM, kant kodali <ka...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> I specified hdfsCheckPointDir = /usr/local/hadoop/checkpoint as you
>>>>> can see below however I dont see checkpoint directory under my hadoop_home=
>>>>> /usr/local/hadoop in either datanodes or namenodes however in
>>>>> datanode machine there seems to be some data under
>>>>>
>>>>> /usr/local/hadoop/hdfs/namenode/current/BP-1469808024-X.X.X.
>>>>> X-1495672725898/current/finalized/subdir0/subdir0
>>>>>
>>>>> I thought the checkpoint directory will be created by spark once I
>>>>> specify the path but do I manually need to create checkpoint dir using
>>>>> mkdir in all spark worker machines? I am new to HDFS as well so please let
>>>>> me know. I can try sending df.explain("true") but I have 100 fields in my
>>>>> schema so Project looks really big and if this is not a problem for you
>>>>> guys I can send that as well.
>>>>>
>>>>>
>>>>>   +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@21002393,kafka,List(),None,List(),None,Map(subscribe -> analytics2, failOnDataLoss -> false, kafka.bootstrap.servers -> X.X.X.X:9092 <http://172.31.41.30:9092>, checkpointLocation -> /usr/local/hadoop/checkpoint, startingOffsets -> earliest),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
>>>>>
>>>>> *Here is the stack trace*
>>>>>
>>>>> StructField(OptionalContext4,StringType,true), StructField(OptionalContext5,StringType,true)),true), cast(value#1 as string)) AS payload#15]
>>>>>             +- StreamingExecutionRelation KafkaSource[Subscribe[analytics2]], [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
>>>>>
>>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:305)
>>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:191)
>>>>> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 3.0 failed 4 times, most recent failure: Lost task 2.3 in stage 3.0 (TID 222, 172.31.25.189, executor 0): java.lang.IllegalStateException: Error reading delta file /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta of HDFSStateStoreProvider[id = (op=0, part=2), dir = /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2]: /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta does not exist
>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:365)
>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:317)
>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:314)
>>>>>     at scala.Option.getOrElse(Option.scala:121)
>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:314)
>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:313)
>>>>>     at scala.Option.getOrElse(Option.scala:121)
>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:313)
>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvider.scala:220)
>>>>>     at org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:186)
>>>>>     at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:61)
>>>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>>>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>>>>>     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>>>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>>>>>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>>>>>     at org.apache.spark.scheduler.Task.run(Task.scala:99)
>>>>>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
>>>>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>     at java.lang.Thread.run(Thread.java:745)
>>>>> Caused by: java.io.FileNotFoundException: File does not exist: /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta
>>>>>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
>>>>>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
>>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1828)
>>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1799)
>>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1712)
>>>>>     at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:588)
>>>>>     at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365)
>>>>>     at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>>>>>     at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
>>>>>     at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>>>>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>>>>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
>>>>>     at java.security.AccessController.doPrivileged(Native Method)
>>>>>     at javax.security.auth.Subject.doAs(Subject.java:422)
>>>>>     at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
>>>>>     at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
>>>>>
>>>>>     at sun.reflect.GeneratedConstructorAccessor13.newInstance(Unknown Source)
>>>>>     at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>>>>     at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>>>>>     at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
>>>>>     at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
>>>>>     at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1228)
>>>>>     at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1213)
>>>>>     at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1201)
>>>>>     at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:306)
>>>>>     at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:272)
>>>>>     at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:264)
>>>>>     at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1526)
>>>>>     at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304)
>>>>>     at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:299)
>>>>>     at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>>>>>     at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:312)
>>>>>     at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:362)
>>>>>     ... 21 more
>>>>> Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta
>>>>>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
>>>>>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
>>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1828)
>>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1799)
>>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1712)
>>>>>     at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:588)
>>>>>     at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365)
>>>>>     at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>>>>>     at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
>>>>>     at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>>>>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>>>>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
>>>>>     at java.security.AccessController.doPrivileged(Native Method)
>>>>>     at javax.security.auth.Subject.doAs(Subject.java:422)
>>>>>     at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
>>>>>     at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
>>>>>
>>>>>     at org.apache.hadoop.ipc.Client.call(Client.java:1475)
>>>>>     at org.apache.hadoop.ipc.Client.call(Client.java:1412)
>>>>>     at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
>>>>>     at com.sun.proxy.$Proxy15.getBlockLocations(Unknown Source)
>>>>>     at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:255)
>>>>>     at sun.reflect.GeneratedMethodAccessor38.invoke(Unknown Source)
>>>>>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>     at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
>>>>>     at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>>>>>     at com.sun.proxy.$Proxy16.getBlockLocations(Unknown Source)
>>>>>     at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1226)
>>>>>     ... 33 more
>>>>>
>>>>> Driver stacktrace:
>>>>>     at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
>>>>>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
>>>>>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
>>>>>     at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>>>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>>>>>     at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
>>>>>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
>>>>>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
>>>>>     at scala.Option.foreach(Option.scala:257)
>>>>>     at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
>>>>>     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
>>>>>     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
>>>>>     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
>>>>>     at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>>     at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
>>>>>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925)
>>>>>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938)
>>>>>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951)
>>>>>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1965)
>>>>>     at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:926)
>>>>>     at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:924)
>>>>>     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>>>>     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>>>>>     at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>>>>>     at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:924)
>>>>>     at org.apache.spark.sql.execution.streaming.ForeachSink.addBatch(ForeachSink.scala:49)
>>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply$mcV$sp(StreamExecution.scala:554)
>>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:554)
>>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:554)
>>>>>     at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:278)
>>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:49)
>>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:553)
>>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply$mcV$sp(StreamExecution.scala:273)
>>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:262)
>>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:262)
>>>>>     at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:278)
>>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:49)
>>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:262)
>>>>>     at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43)
>>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:257)
>>>>>     ... 1 more
>>>>> Caused by: java.lang.IllegalStateException: Error reading delta file /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta of HDFSStateStoreProvider[id = (op=0, part=2), dir = /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2]: /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta does not exist
>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:365)
>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:317)
>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:314)
>>>>>     at scala.Option.getOrElse(Option.scala:121)
>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:314)
>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:313)
>>>>>     at scala.Option.getOrElse(Option.scala:121)
>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:313)
>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvider.scala:220)
>>>>>     at org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:186)
>>>>>     at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:61)
>>>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>>>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>>>>>     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>>>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>>>>>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>>>>>     at org.apache.spark.scheduler.Task.run(Task.scala:99)
>>>>>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
>>>>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>     at java.lang.Thread.run(Thread.java:745)
>>>>> Caused by: java.io.FileNotFoundException: File does not exist: /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta
>>>>>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
>>>>>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
>>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1828)
>>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1799)
>>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1712)
>>>>>     at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:588)
>>>>>     at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365)
>>>>>     at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>>>>>     at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
>>>>>     at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>>>>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>>>>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
>>>>>     at java.security.AccessController.doPrivileged(Native Method)
>>>>>     at javax.security.auth.Subject.doAs(Subject.java:422)
>>>>>     at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
>>>>>     at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
>>>>>
>>>>>     at sun.reflect.GeneratedConstructorAccessor13.newInstance(Unknown Source)
>>>>>     at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>>>>     at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>>>>>     at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
>>>>>     at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
>>>>>     at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1228)
>>>>>     at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1213)
>>>>>     at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1201)
>>>>>     at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:306)
>>>>>     at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:272)
>>>>>     at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:264)
>>>>>     at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1526)
>>>>>     at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304)
>>>>>     at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:299)
>>>>>     at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>>>>>     at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:312)
>>>>>     at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
>>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:362)
>>>>>     ... 21 more
>>>>> Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta
>>>>>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
>>>>>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
>>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1828)
>>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1799)
>>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1712)
>>>>>     at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:588)
>>>>>     at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365)
>>>>>     at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>>>>>     at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
>>>>>     at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>>>>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>>>>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
>>>>>     at java.security.AccessController.doPrivileged(Native Method)
>>>>>     at javax.security.auth.Subject.doAs(Subject.java:422)
>>>>>     at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
>>>>>     at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
>>>>>
>>>>>     at org.apache.hadoop.ipc.Client.call(Client.java:1475)
>>>>>     at org.apache.hadoop.ipc.Client.call(Client.java:1412)
>>>>>     at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
>>>>>     at com.sun.proxy.$Proxy15.getBlockLocations(Unknown Source)
>>>>>     at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:255)
>>>>>     at sun.reflect.GeneratedMethodAccessor38.invoke(Unknown Source)
>>>>>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>     at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
>>>>>     at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>>>>>     at com.sun.proxy.$Proxy16.getBlockLocations(Unknown Source)
>>>>>     at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1226)
>>>>>
>>>>>
>>>>> On Wed, May 24, 2017 at 4:29 PM, Shixiong(Ryan) Zhu <
>>>>> shixiong@databricks.com> wrote:
>>>>>
>>>>>> What's the value of "hdfsCheckPointDir"? Could you list this
>>>>>> directory on HDFS and report the files there?
>>>>>>
>>>>>> On Wed, May 24, 2017 at 3:50 PM, Michael Armbrust <
>>>>>> michael@databricks.com> wrote:
>>>>>>
>>>>>>> -dev
>>>>>>>
>>>>>>> Have you tried clearing out the checkpoint directory?  Can you also
>>>>>>> give the full stack trace?
>>>>>>>
>>>>>>> On Wed, May 24, 2017 at 3:45 PM, kant kodali <ka...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Even if I do simple count aggregation like below I get the same
>>>>>>>> error as https://issues.apache.org/jira/browse/SPARK-19268
>>>>>>>>
>>>>>>>> Dataset<Row> df2 = df1.groupBy(functions.window(df1.col("Timestamp5"), "24 hours", "24 hours"), df1.col("AppName")).count();
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, May 24, 2017 at 3:35 PM, kant kodali <ka...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi All,
>>>>>>>>>
>>>>>>>>> I am using Spark 2.1.1 and running in a Standalone mode using HDFS
>>>>>>>>> and Kafka
>>>>>>>>>
>>>>>>>>> I am running into the same problem as
>>>>>>>>> https://issues.apache.org/jira/browse/SPARK-19268 with my app(not
>>>>>>>>> KafkaWordCount).
>>>>>>>>>
>>>>>>>>> Here is my sample code
>>>>>>>>>
>>>>>>>>> *Here is how I create ReadStream*
>>>>>>>>>
>>>>>>>>> sparkSession.readStream()
>>>>>>>>>                 .format("kafka")
>>>>>>>>>                 .option("kafka.bootstrap.servers", config.getString("kafka.consumer.settings.bootstrapServers"))
>>>>>>>>>                 .option("subscribe", config.getString("kafka.consumer.settings.topicName"))
>>>>>>>>>                 .option("startingOffsets", "earliest")
>>>>>>>>>                 .option("failOnDataLoss", "false")
>>>>>>>>>                 .option("checkpointLocation", hdfsCheckPointDir)
>>>>>>>>>                 .load();
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> *The core logic*
>>>>>>>>>
>>>>>>>>> Dataset<Row> df = ds.select(from_json(new Column("value").cast("string"), client.getSchema()).as("payload"));
>>>>>>>>> Dataset<Row> df1 = df.selectExpr("payload.info.*", "payload.data.*");
>>>>>>>>> Dataset<Row> df2 = df1.groupBy(window(df1.col("Timestamp5"), "24 hours", "24 hours"), df1.col("AppName")).agg(sum("Amount"));
>>>>>>>>> StreamingQuery query = df1.writeStream().foreach(new KafkaSink()).outputMode("update").start();
>>>>>>>>> query.awaitTermination();
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I can also provide any other information you may need.
>>>>>>>>>
>>>>>>>>> Thanks!
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Running into the same problem as JIRA SPARK-19268

Posted by kant kodali <ka...@gmail.com>.
I did the following

*bin/hadoop fs -mkdir -p **/usr/local/hadoop/checkpoint* and did *bin/hadoop
fs -ls / *

and I can actually see */tmp* and */usr* and inside of */usr *there is
indeed *local/hadoop/checkpoint. *

So until here it looks fine.

I also cleared everything */tmp/** as @Michael suggested using *bin/hadoop
fs -rmdir * such that when I do *bin/hadoop fs -ls /tmp *I don't see
anything.

Now I ran my spark driver program using spark-submit it failed with the
following exception

*File does not exist:
/tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2/1.delta*

so I did *bin/hadoop fs -ls *
*/tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2 *

and I did not see anything there like *1.delta(there are just no
files)* however
all these directories
*/tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2
 *do exist.

For my checkPointLocation I had passed   */usr/local/hadoop/checkpoint *and
 *hdfs://<namenode:port>/usr/local/hadoop/checkpoint  *so far and both
didn't work for me. It is failing with the same error "*File does not
exist:
/tmp/temporary-c675a900-eee5-4bb0-a7d3-098dcef3ae53/state/0/2/1.delta*"

so what can be the problem? any ideas?

Thanks!

On Thu, May 25, 2017 at 1:31 AM, kant kodali <ka...@gmail.com> wrote:

> Executing this bin/hadoop fs -ls /usr/local/hadoop/checkpoint says
>
> ls: `/usr/local/hadoop/checkpoint': No such file or directory
>
> This is what I expected as well since I don't see any checkpoint directory
> under /usr/local/hadoop. Am I missing any configuration variable like
> HADOOP_CONF_DIR ? I am currently not setting that in conf/spark-env.sh
> and thats the only hadoop related environment variable I see. please let me
> know
>
> thanks!
>
>
>
> On Thu, May 25, 2017 at 1:19 AM, kant kodali <ka...@gmail.com> wrote:
>
>> Hi Ryan,
>>
>> I did add that print statement and here is what I got.
>>
>> class org.apache.hadoop.hdfs.DistributedFileSystem
>>
>> Thanks!
>>
>> On Wed, May 24, 2017 at 11:39 PM, Shixiong(Ryan) Zhu <
>> shixiong@databricks.com> wrote:
>>
>>> I meant using HDFS command to check the directory. Such as "bin/hadoop
>>> fs -ls /usr/local/hadoop/checkpoint". My hunch is the default file system
>>> in driver probably is the local file system. Could you add the following
>>> line into your code to print the default file system?
>>>
>>> println(org.apache.hadoop.fs.FileSystem.get(sc.hadoopConfigu
>>> ration).getClass)
>>>
>>> On Wed, May 24, 2017 at 5:59 PM, kant kodali <ka...@gmail.com> wrote:
>>>
>>>> Hi All,
>>>>
>>>> I specified hdfsCheckPointDir = /usr/local/hadoop/checkpoint as you
>>>> can see below however I dont see checkpoint directory under my hadoop_home=
>>>> /usr/local/hadoop in either datanodes or namenodes however in datanode
>>>> machine there seems to be some data under
>>>>
>>>> /usr/local/hadoop/hdfs/namenode/current/BP-1469808024-X.X.X.
>>>> X-1495672725898/current/finalized/subdir0/subdir0
>>>>
>>>> I thought the checkpoint directory will be created by spark once I
>>>> specify the path but do I manually need to create checkpoint dir using
>>>> mkdir in all spark worker machines? I am new to HDFS as well so please let
>>>> me know. I can try sending df.explain("true") but I have 100 fields in my
>>>> schema so Project looks really big and if this is not a problem for you
>>>> guys I can send that as well.
>>>>
>>>>
>>>>   +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@21002393,kafka,List(),None,List(),None,Map(subscribe -> analytics2, failOnDataLoss -> false, kafka.bootstrap.servers -> X.X.X.X:9092 <http://172.31.41.30:9092>, checkpointLocation -> /usr/local/hadoop/checkpoint, startingOffsets -> earliest),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
>>>>
>>>> *Here is the stack trace*
>>>>
>>>> StructField(OptionalContext4,StringType,true), StructField(OptionalContext5,StringType,true)),true), cast(value#1 as string)) AS payload#15]
>>>>             +- StreamingExecutionRelation KafkaSource[Subscribe[analytics2]], [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
>>>>
>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:305)
>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:191)
>>>> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 3.0 failed 4 times, most recent failure: Lost task 2.3 in stage 3.0 (TID 222, 172.31.25.189, executor 0): java.lang.IllegalStateException: Error reading delta file /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta of HDFSStateStoreProvider[id = (op=0, part=2), dir = /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2]: /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta does not exist
>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:365)
>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:317)
>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:314)
>>>>     at scala.Option.getOrElse(Option.scala:121)
>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:314)
>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:313)
>>>>     at scala.Option.getOrElse(Option.scala:121)
>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:313)
>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvider.scala:220)
>>>>     at org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:186)
>>>>     at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:61)
>>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>>>>     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>>>>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>>>>     at org.apache.spark.scheduler.Task.run(Task.scala:99)
>>>>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
>>>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>     at java.lang.Thread.run(Thread.java:745)
>>>> Caused by: java.io.FileNotFoundException: File does not exist: /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta
>>>>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
>>>>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1828)
>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1799)
>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1712)
>>>>     at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:588)
>>>>     at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365)
>>>>     at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>>>>     at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
>>>>     at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>>>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>>>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
>>>>     at java.security.AccessController.doPrivileged(Native Method)
>>>>     at javax.security.auth.Subject.doAs(Subject.java:422)
>>>>     at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
>>>>     at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
>>>>
>>>>     at sun.reflect.GeneratedConstructorAccessor13.newInstance(Unknown Source)
>>>>     at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>>>     at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>>>>     at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
>>>>     at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
>>>>     at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1228)
>>>>     at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1213)
>>>>     at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1201)
>>>>     at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:306)
>>>>     at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:272)
>>>>     at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:264)
>>>>     at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1526)
>>>>     at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304)
>>>>     at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:299)
>>>>     at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>>>>     at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:312)
>>>>     at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:362)
>>>>     ... 21 more
>>>> Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta
>>>>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
>>>>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1828)
>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1799)
>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1712)
>>>>     at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:588)
>>>>     at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365)
>>>>     at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>>>>     at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
>>>>     at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>>>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>>>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
>>>>     at java.security.AccessController.doPrivileged(Native Method)
>>>>     at javax.security.auth.Subject.doAs(Subject.java:422)
>>>>     at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
>>>>     at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
>>>>
>>>>     at org.apache.hadoop.ipc.Client.call(Client.java:1475)
>>>>     at org.apache.hadoop.ipc.Client.call(Client.java:1412)
>>>>     at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
>>>>     at com.sun.proxy.$Proxy15.getBlockLocations(Unknown Source)
>>>>     at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:255)
>>>>     at sun.reflect.GeneratedMethodAccessor38.invoke(Unknown Source)
>>>>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>>     at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
>>>>     at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>>>>     at com.sun.proxy.$Proxy16.getBlockLocations(Unknown Source)
>>>>     at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1226)
>>>>     ... 33 more
>>>>
>>>> Driver stacktrace:
>>>>     at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
>>>>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
>>>>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
>>>>     at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>>>>     at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
>>>>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
>>>>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
>>>>     at scala.Option.foreach(Option.scala:257)
>>>>     at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
>>>>     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
>>>>     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
>>>>     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
>>>>     at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>     at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
>>>>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925)
>>>>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938)
>>>>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951)
>>>>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1965)
>>>>     at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:926)
>>>>     at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:924)
>>>>     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>>>     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>>>>     at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>>>>     at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:924)
>>>>     at org.apache.spark.sql.execution.streaming.ForeachSink.addBatch(ForeachSink.scala:49)
>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply$mcV$sp(StreamExecution.scala:554)
>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:554)
>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:554)
>>>>     at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:278)
>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:49)
>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:553)
>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply$mcV$sp(StreamExecution.scala:273)
>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:262)
>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:262)
>>>>     at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:278)
>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:49)
>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:262)
>>>>     at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43)
>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:257)
>>>>     ... 1 more
>>>> Caused by: java.lang.IllegalStateException: Error reading delta file /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta of HDFSStateStoreProvider[id = (op=0, part=2), dir = /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2]: /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta does not exist
>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:365)
>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:317)
>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:314)
>>>>     at scala.Option.getOrElse(Option.scala:121)
>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:314)
>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:313)
>>>>     at scala.Option.getOrElse(Option.scala:121)
>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:313)
>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvider.scala:220)
>>>>     at org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:186)
>>>>     at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:61)
>>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>>>>     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>>>>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>>>>     at org.apache.spark.scheduler.Task.run(Task.scala:99)
>>>>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
>>>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>     at java.lang.Thread.run(Thread.java:745)
>>>> Caused by: java.io.FileNotFoundException: File does not exist: /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta
>>>>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
>>>>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1828)
>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1799)
>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1712)
>>>>     at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:588)
>>>>     at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365)
>>>>     at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>>>>     at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
>>>>     at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>>>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>>>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
>>>>     at java.security.AccessController.doPrivileged(Native Method)
>>>>     at javax.security.auth.Subject.doAs(Subject.java:422)
>>>>     at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
>>>>     at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
>>>>
>>>>     at sun.reflect.GeneratedConstructorAccessor13.newInstance(Unknown Source)
>>>>     at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>>>     at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>>>>     at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
>>>>     at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
>>>>     at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1228)
>>>>     at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1213)
>>>>     at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1201)
>>>>     at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:306)
>>>>     at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:272)
>>>>     at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:264)
>>>>     at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1526)
>>>>     at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304)
>>>>     at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:299)
>>>>     at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>>>>     at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:312)
>>>>     at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
>>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:362)
>>>>     ... 21 more
>>>> Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta
>>>>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
>>>>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1828)
>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1799)
>>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1712)
>>>>     at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:588)
>>>>     at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365)
>>>>     at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>>>>     at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
>>>>     at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>>>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>>>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
>>>>     at java.security.AccessController.doPrivileged(Native Method)
>>>>     at javax.security.auth.Subject.doAs(Subject.java:422)
>>>>     at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
>>>>     at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
>>>>
>>>>     at org.apache.hadoop.ipc.Client.call(Client.java:1475)
>>>>     at org.apache.hadoop.ipc.Client.call(Client.java:1412)
>>>>     at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
>>>>     at com.sun.proxy.$Proxy15.getBlockLocations(Unknown Source)
>>>>     at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:255)
>>>>     at sun.reflect.GeneratedMethodAccessor38.invoke(Unknown Source)
>>>>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>>     at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
>>>>     at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>>>>     at com.sun.proxy.$Proxy16.getBlockLocations(Unknown Source)
>>>>     at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1226)
>>>>
>>>>
>>>> On Wed, May 24, 2017 at 4:29 PM, Shixiong(Ryan) Zhu <
>>>> shixiong@databricks.com> wrote:
>>>>
>>>>> What's the value of "hdfsCheckPointDir"? Could you list this directory
>>>>> on HDFS and report the files there?
>>>>>
>>>>> On Wed, May 24, 2017 at 3:50 PM, Michael Armbrust <
>>>>> michael@databricks.com> wrote:
>>>>>
>>>>>> -dev
>>>>>>
>>>>>> Have you tried clearing out the checkpoint directory?  Can you also
>>>>>> give the full stack trace?
>>>>>>
>>>>>> On Wed, May 24, 2017 at 3:45 PM, kant kodali <ka...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Even if I do simple count aggregation like below I get the same
>>>>>>> error as https://issues.apache.org/jira/browse/SPARK-19268
>>>>>>>
>>>>>>> Dataset<Row> df2 = df1.groupBy(functions.window(df1.col("Timestamp5"), "24 hours", "24 hours"), df1.col("AppName")).count();
>>>>>>>
>>>>>>>
>>>>>>> On Wed, May 24, 2017 at 3:35 PM, kant kodali <ka...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi All,
>>>>>>>>
>>>>>>>> I am using Spark 2.1.1 and running in a Standalone mode using HDFS
>>>>>>>> and Kafka
>>>>>>>>
>>>>>>>> I am running into the same problem as
>>>>>>>> https://issues.apache.org/jira/browse/SPARK-19268 with my app(not
>>>>>>>> KafkaWordCount).
>>>>>>>>
>>>>>>>> Here is my sample code
>>>>>>>>
>>>>>>>> *Here is how I create ReadStream*
>>>>>>>>
>>>>>>>> sparkSession.readStream()
>>>>>>>>                 .format("kafka")
>>>>>>>>                 .option("kafka.bootstrap.servers", config.getString("kafka.consumer.settings.bootstrapServers"))
>>>>>>>>                 .option("subscribe", config.getString("kafka.consumer.settings.topicName"))
>>>>>>>>                 .option("startingOffsets", "earliest")
>>>>>>>>                 .option("failOnDataLoss", "false")
>>>>>>>>                 .option("checkpointLocation", hdfsCheckPointDir)
>>>>>>>>                 .load();
>>>>>>>>
>>>>>>>>
>>>>>>>> *The core logic*
>>>>>>>>
>>>>>>>> Dataset<Row> df = ds.select(from_json(new Column("value").cast("string"), client.getSchema()).as("payload"));
>>>>>>>> Dataset<Row> df1 = df.selectExpr("payload.info.*", "payload.data.*");
>>>>>>>> Dataset<Row> df2 = df1.groupBy(window(df1.col("Timestamp5"), "24 hours", "24 hours"), df1.col("AppName")).agg(sum("Amount"));
>>>>>>>> StreamingQuery query = df1.writeStream().foreach(new KafkaSink()).outputMode("update").start();
>>>>>>>> query.awaitTermination();
>>>>>>>>
>>>>>>>>
>>>>>>>> I can also provide any other information you may need.
>>>>>>>>
>>>>>>>> Thanks!
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Running into the same problem as JIRA SPARK-19268

Posted by kant kodali <ka...@gmail.com>.
Executing this bin/hadoop fs -ls /usr/local/hadoop/checkpoint says

ls: `/usr/local/hadoop/checkpoint': No such file or directory

This is what I expected as well since I don't see any checkpoint directory
under /usr/local/hadoop. Am I missing any configuration variable like
HADOOP_CONF_DIR ? I am currently not setting that in conf/spark-env.sh and
thats the only hadoop related environment variable I see. please let me know

thanks!



On Thu, May 25, 2017 at 1:19 AM, kant kodali <ka...@gmail.com> wrote:

> Hi Ryan,
>
> I did add that print statement and here is what I got.
>
> class org.apache.hadoop.hdfs.DistributedFileSystem
>
> Thanks!
>
> On Wed, May 24, 2017 at 11:39 PM, Shixiong(Ryan) Zhu <
> shixiong@databricks.com> wrote:
>
>> I meant using HDFS command to check the directory. Such as "bin/hadoop fs
>> -ls /usr/local/hadoop/checkpoint". My hunch is the default file system in
>> driver probably is the local file system. Could you add the following line
>> into your code to print the default file system?
>>
>> println(org.apache.hadoop.fs.FileSystem.get(sc.hadoopConfigu
>> ration).getClass)
>>
>> On Wed, May 24, 2017 at 5:59 PM, kant kodali <ka...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> I specified hdfsCheckPointDir = /usr/local/hadoop/checkpoint as you can
>>> see below however I dont see checkpoint directory under my hadoop_home=
>>> /usr/local/hadoop in either datanodes or namenodes however in datanode
>>> machine there seems to be some data under
>>>
>>> /usr/local/hadoop/hdfs/namenode/current/BP-1469808024-X.X.X.
>>> X-1495672725898/current/finalized/subdir0/subdir0
>>>
>>> I thought the checkpoint directory will be created by spark once I
>>> specify the path but do I manually need to create checkpoint dir using
>>> mkdir in all spark worker machines? I am new to HDFS as well so please let
>>> me know. I can try sending df.explain("true") but I have 100 fields in my
>>> schema so Project looks really big and if this is not a problem for you
>>> guys I can send that as well.
>>>
>>>
>>>   +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@21002393,kafka,List(),None,List(),None,Map(subscribe -> analytics2, failOnDataLoss -> false, kafka.bootstrap.servers -> X.X.X.X:9092 <http://172.31.41.30:9092>, checkpointLocation -> /usr/local/hadoop/checkpoint, startingOffsets -> earliest),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
>>>
>>> *Here is the stack trace*
>>>
>>> StructField(OptionalContext4,StringType,true), StructField(OptionalContext5,StringType,true)),true), cast(value#1 as string)) AS payload#15]
>>>             +- StreamingExecutionRelation KafkaSource[Subscribe[analytics2]], [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
>>>
>>>     at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:305)
>>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:191)
>>> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 3.0 failed 4 times, most recent failure: Lost task 2.3 in stage 3.0 (TID 222, 172.31.25.189, executor 0): java.lang.IllegalStateException: Error reading delta file /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta of HDFSStateStoreProvider[id = (op=0, part=2), dir = /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2]: /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta does not exist
>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:365)
>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:317)
>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:314)
>>>     at scala.Option.getOrElse(Option.scala:121)
>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:314)
>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:313)
>>>     at scala.Option.getOrElse(Option.scala:121)
>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:313)
>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvider.scala:220)
>>>     at org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:186)
>>>     at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:61)
>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>>>     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>>>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>>>     at org.apache.spark.scheduler.Task.run(Task.scala:99)
>>>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
>>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>     at java.lang.Thread.run(Thread.java:745)
>>> Caused by: java.io.FileNotFoundException: File does not exist: /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta
>>>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
>>>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1828)
>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1799)
>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1712)
>>>     at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:588)
>>>     at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365)
>>>     at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>>>     at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
>>>     at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
>>>     at java.security.AccessController.doPrivileged(Native Method)
>>>     at javax.security.auth.Subject.doAs(Subject.java:422)
>>>     at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
>>>     at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
>>>
>>>     at sun.reflect.GeneratedConstructorAccessor13.newInstance(Unknown Source)
>>>     at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>>     at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>>>     at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
>>>     at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
>>>     at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1228)
>>>     at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1213)
>>>     at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1201)
>>>     at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:306)
>>>     at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:272)
>>>     at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:264)
>>>     at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1526)
>>>     at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304)
>>>     at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:299)
>>>     at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>>>     at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:312)
>>>     at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:362)
>>>     ... 21 more
>>> Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta
>>>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
>>>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1828)
>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1799)
>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1712)
>>>     at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:588)
>>>     at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365)
>>>     at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>>>     at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
>>>     at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
>>>     at java.security.AccessController.doPrivileged(Native Method)
>>>     at javax.security.auth.Subject.doAs(Subject.java:422)
>>>     at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
>>>     at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
>>>
>>>     at org.apache.hadoop.ipc.Client.call(Client.java:1475)
>>>     at org.apache.hadoop.ipc.Client.call(Client.java:1412)
>>>     at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
>>>     at com.sun.proxy.$Proxy15.getBlockLocations(Unknown Source)
>>>     at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:255)
>>>     at sun.reflect.GeneratedMethodAccessor38.invoke(Unknown Source)
>>>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>     at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
>>>     at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>>>     at com.sun.proxy.$Proxy16.getBlockLocations(Unknown Source)
>>>     at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1226)
>>>     ... 33 more
>>>
>>> Driver stacktrace:
>>>     at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
>>>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
>>>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
>>>     at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>>>     at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
>>>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
>>>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
>>>     at scala.Option.foreach(Option.scala:257)
>>>     at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
>>>     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
>>>     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
>>>     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
>>>     at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>     at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
>>>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925)
>>>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938)
>>>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951)
>>>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1965)
>>>     at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:926)
>>>     at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:924)
>>>     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>>     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>>>     at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>>>     at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:924)
>>>     at org.apache.spark.sql.execution.streaming.ForeachSink.addBatch(ForeachSink.scala:49)
>>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply$mcV$sp(StreamExecution.scala:554)
>>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:554)
>>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:554)
>>>     at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:278)
>>>     at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:49)
>>>     at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:553)
>>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply$mcV$sp(StreamExecution.scala:273)
>>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:262)
>>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:262)
>>>     at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:278)
>>>     at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:49)
>>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:262)
>>>     at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43)
>>>     at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:257)
>>>     ... 1 more
>>> Caused by: java.lang.IllegalStateException: Error reading delta file /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta of HDFSStateStoreProvider[id = (op=0, part=2), dir = /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2]: /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta does not exist
>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:365)
>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:317)
>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:314)
>>>     at scala.Option.getOrElse(Option.scala:121)
>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:314)
>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:313)
>>>     at scala.Option.getOrElse(Option.scala:121)
>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:313)
>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvider.scala:220)
>>>     at org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:186)
>>>     at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:61)
>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>>>     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>>>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>>>     at org.apache.spark.scheduler.Task.run(Task.scala:99)
>>>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
>>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>     at java.lang.Thread.run(Thread.java:745)
>>> Caused by: java.io.FileNotFoundException: File does not exist: /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta
>>>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
>>>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1828)
>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1799)
>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1712)
>>>     at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:588)
>>>     at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365)
>>>     at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>>>     at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
>>>     at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
>>>     at java.security.AccessController.doPrivileged(Native Method)
>>>     at javax.security.auth.Subject.doAs(Subject.java:422)
>>>     at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
>>>     at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
>>>
>>>     at sun.reflect.GeneratedConstructorAccessor13.newInstance(Unknown Source)
>>>     at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>>     at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>>>     at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
>>>     at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
>>>     at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1228)
>>>     at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1213)
>>>     at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1201)
>>>     at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:306)
>>>     at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:272)
>>>     at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:264)
>>>     at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1526)
>>>     at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304)
>>>     at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:299)
>>>     at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>>>     at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:312)
>>>     at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
>>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:362)
>>>     ... 21 more
>>> Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta
>>>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
>>>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1828)
>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1799)
>>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1712)
>>>     at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:588)
>>>     at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365)
>>>     at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>>>     at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
>>>     at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
>>>     at java.security.AccessController.doPrivileged(Native Method)
>>>     at javax.security.auth.Subject.doAs(Subject.java:422)
>>>     at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
>>>     at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
>>>
>>>     at org.apache.hadoop.ipc.Client.call(Client.java:1475)
>>>     at org.apache.hadoop.ipc.Client.call(Client.java:1412)
>>>     at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
>>>     at com.sun.proxy.$Proxy15.getBlockLocations(Unknown Source)
>>>     at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:255)
>>>     at sun.reflect.GeneratedMethodAccessor38.invoke(Unknown Source)
>>>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>     at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
>>>     at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>>>     at com.sun.proxy.$Proxy16.getBlockLocations(Unknown Source)
>>>     at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1226)
>>>
>>>
>>> On Wed, May 24, 2017 at 4:29 PM, Shixiong(Ryan) Zhu <
>>> shixiong@databricks.com> wrote:
>>>
>>>> What's the value of "hdfsCheckPointDir"? Could you list this directory
>>>> on HDFS and report the files there?
>>>>
>>>> On Wed, May 24, 2017 at 3:50 PM, Michael Armbrust <
>>>> michael@databricks.com> wrote:
>>>>
>>>>> -dev
>>>>>
>>>>> Have you tried clearing out the checkpoint directory?  Can you also
>>>>> give the full stack trace?
>>>>>
>>>>> On Wed, May 24, 2017 at 3:45 PM, kant kodali <ka...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Even if I do simple count aggregation like below I get the same error
>>>>>> as https://issues.apache.org/jira/browse/SPARK-19268
>>>>>>
>>>>>> Dataset<Row> df2 = df1.groupBy(functions.window(df1.col("Timestamp5"), "24 hours", "24 hours"), df1.col("AppName")).count();
>>>>>>
>>>>>>
>>>>>> On Wed, May 24, 2017 at 3:35 PM, kant kodali <ka...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi All,
>>>>>>>
>>>>>>> I am using Spark 2.1.1 and running in a Standalone mode using HDFS
>>>>>>> and Kafka
>>>>>>>
>>>>>>> I am running into the same problem as https://issues.apache.org/jira
>>>>>>> /browse/SPARK-19268 with my app(not KafkaWordCount).
>>>>>>>
>>>>>>> Here is my sample code
>>>>>>>
>>>>>>> *Here is how I create ReadStream*
>>>>>>>
>>>>>>> sparkSession.readStream()
>>>>>>>                 .format("kafka")
>>>>>>>                 .option("kafka.bootstrap.servers", config.getString("kafka.consumer.settings.bootstrapServers"))
>>>>>>>                 .option("subscribe", config.getString("kafka.consumer.settings.topicName"))
>>>>>>>                 .option("startingOffsets", "earliest")
>>>>>>>                 .option("failOnDataLoss", "false")
>>>>>>>                 .option("checkpointLocation", hdfsCheckPointDir)
>>>>>>>                 .load();
>>>>>>>
>>>>>>>
>>>>>>> *The core logic*
>>>>>>>
>>>>>>> Dataset<Row> df = ds.select(from_json(new Column("value").cast("string"), client.getSchema()).as("payload"));
>>>>>>> Dataset<Row> df1 = df.selectExpr("payload.info.*", "payload.data.*");
>>>>>>> Dataset<Row> df2 = df1.groupBy(window(df1.col("Timestamp5"), "24 hours", "24 hours"), df1.col("AppName")).agg(sum("Amount"));
>>>>>>> StreamingQuery query = df1.writeStream().foreach(new KafkaSink()).outputMode("update").start();
>>>>>>> query.awaitTermination();
>>>>>>>
>>>>>>>
>>>>>>> I can also provide any other information you may need.
>>>>>>>
>>>>>>> Thanks!
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Running into the same problem as JIRA SPARK-19268

Posted by kant kodali <ka...@gmail.com>.
Hi Ryan,

I did add that print statement and here is what I got.

class org.apache.hadoop.hdfs.DistributedFileSystem

Thanks!

On Wed, May 24, 2017 at 11:39 PM, Shixiong(Ryan) Zhu <
shixiong@databricks.com> wrote:

> I meant using HDFS command to check the directory. Such as "bin/hadoop fs
> -ls /usr/local/hadoop/checkpoint". My hunch is the default file system in
> driver probably is the local file system. Could you add the following line
> into your code to print the default file system?
>
> println(org.apache.hadoop.fs.FileSystem.get(sc.
> hadoopConfiguration).getClass)
>
> On Wed, May 24, 2017 at 5:59 PM, kant kodali <ka...@gmail.com> wrote:
>
>> Hi All,
>>
>> I specified hdfsCheckPointDir = /usr/local/hadoop/checkpoint as you can
>> see below however I dont see checkpoint directory under my hadoop_home=
>> /usr/local/hadoop in either datanodes or namenodes however in datanode
>> machine there seems to be some data under
>>
>> /usr/local/hadoop/hdfs/namenode/current/BP-1469808024-X.X.X.
>> X-1495672725898/current/finalized/subdir0/subdir0
>>
>> I thought the checkpoint directory will be created by spark once I
>> specify the path but do I manually need to create checkpoint dir using
>> mkdir in all spark worker machines? I am new to HDFS as well so please let
>> me know. I can try sending df.explain("true") but I have 100 fields in my
>> schema so Project looks really big and if this is not a problem for you
>> guys I can send that as well.
>>
>>
>>   +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@21002393,kafka,List(),None,List(),None,Map(subscribe -> analytics2, failOnDataLoss -> false, kafka.bootstrap.servers -> X.X.X.X:9092 <http://172.31.41.30:9092>, checkpointLocation -> /usr/local/hadoop/checkpoint, startingOffsets -> earliest),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
>>
>> *Here is the stack trace*
>>
>> StructField(OptionalContext4,StringType,true), StructField(OptionalContext5,StringType,true)),true), cast(value#1 as string)) AS payload#15]
>>             +- StreamingExecutionRelation KafkaSource[Subscribe[analytics2]], [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
>>
>>     at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:305)
>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:191)
>> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 3.0 failed 4 times, most recent failure: Lost task 2.3 in stage 3.0 (TID 222, 172.31.25.189, executor 0): java.lang.IllegalStateException: Error reading delta file /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta of HDFSStateStoreProvider[id = (op=0, part=2), dir = /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2]: /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta does not exist
>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:365)
>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:317)
>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:314)
>>     at scala.Option.getOrElse(Option.scala:121)
>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:314)
>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:313)
>>     at scala.Option.getOrElse(Option.scala:121)
>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:313)
>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvider.scala:220)
>>     at org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:186)
>>     at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:61)
>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>>     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>>     at org.apache.spark.scheduler.Task.run(Task.scala:99)
>>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>     at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.io.FileNotFoundException: File does not exist: /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta
>>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
>>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1828)
>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1799)
>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1712)
>>     at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:588)
>>     at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365)
>>     at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>>     at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
>>     at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
>>     at java.security.AccessController.doPrivileged(Native Method)
>>     at javax.security.auth.Subject.doAs(Subject.java:422)
>>     at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
>>     at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
>>
>>     at sun.reflect.GeneratedConstructorAccessor13.newInstance(Unknown Source)
>>     at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>     at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>>     at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
>>     at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
>>     at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1228)
>>     at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1213)
>>     at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1201)
>>     at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:306)
>>     at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:272)
>>     at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:264)
>>     at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1526)
>>     at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304)
>>     at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:299)
>>     at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>>     at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:312)
>>     at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:362)
>>     ... 21 more
>> Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta
>>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
>>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1828)
>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1799)
>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1712)
>>     at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:588)
>>     at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365)
>>     at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>>     at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
>>     at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
>>     at java.security.AccessController.doPrivileged(Native Method)
>>     at javax.security.auth.Subject.doAs(Subject.java:422)
>>     at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
>>     at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
>>
>>     at org.apache.hadoop.ipc.Client.call(Client.java:1475)
>>     at org.apache.hadoop.ipc.Client.call(Client.java:1412)
>>     at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
>>     at com.sun.proxy.$Proxy15.getBlockLocations(Unknown Source)
>>     at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:255)
>>     at sun.reflect.GeneratedMethodAccessor38.invoke(Unknown Source)
>>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>     at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
>>     at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>>     at com.sun.proxy.$Proxy16.getBlockLocations(Unknown Source)
>>     at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1226)
>>     ... 33 more
>>
>> Driver stacktrace:
>>     at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
>>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
>>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
>>     at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>>     at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
>>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
>>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
>>     at scala.Option.foreach(Option.scala:257)
>>     at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
>>     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
>>     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
>>     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
>>     at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>     at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
>>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925)
>>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938)
>>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951)
>>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1965)
>>     at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:926)
>>     at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:924)
>>     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>>     at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>>     at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:924)
>>     at org.apache.spark.sql.execution.streaming.ForeachSink.addBatch(ForeachSink.scala:49)
>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply$mcV$sp(StreamExecution.scala:554)
>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:554)
>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:554)
>>     at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:278)
>>     at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:49)
>>     at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:553)
>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply$mcV$sp(StreamExecution.scala:273)
>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:262)
>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:262)
>>     at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:278)
>>     at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:49)
>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:262)
>>     at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43)
>>     at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:257)
>>     ... 1 more
>> Caused by: java.lang.IllegalStateException: Error reading delta file /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta of HDFSStateStoreProvider[id = (op=0, part=2), dir = /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2]: /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta does not exist
>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:365)
>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:317)
>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:314)
>>     at scala.Option.getOrElse(Option.scala:121)
>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:314)
>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:313)
>>     at scala.Option.getOrElse(Option.scala:121)
>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:313)
>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvider.scala:220)
>>     at org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:186)
>>     at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:61)
>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>>     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>>     at org.apache.spark.scheduler.Task.run(Task.scala:99)
>>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>     at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.io.FileNotFoundException: File does not exist: /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta
>>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
>>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1828)
>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1799)
>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1712)
>>     at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:588)
>>     at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365)
>>     at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>>     at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
>>     at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
>>     at java.security.AccessController.doPrivileged(Native Method)
>>     at javax.security.auth.Subject.doAs(Subject.java:422)
>>     at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
>>     at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
>>
>>     at sun.reflect.GeneratedConstructorAccessor13.newInstance(Unknown Source)
>>     at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>     at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>>     at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
>>     at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
>>     at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1228)
>>     at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1213)
>>     at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1201)
>>     at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:306)
>>     at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:272)
>>     at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:264)
>>     at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1526)
>>     at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304)
>>     at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:299)
>>     at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>>     at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:312)
>>     at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
>>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:362)
>>     ... 21 more
>> Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta
>>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
>>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1828)
>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1799)
>>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1712)
>>     at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:588)
>>     at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365)
>>     at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>>     at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
>>     at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
>>     at java.security.AccessController.doPrivileged(Native Method)
>>     at javax.security.auth.Subject.doAs(Subject.java:422)
>>     at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
>>     at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
>>
>>     at org.apache.hadoop.ipc.Client.call(Client.java:1475)
>>     at org.apache.hadoop.ipc.Client.call(Client.java:1412)
>>     at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
>>     at com.sun.proxy.$Proxy15.getBlockLocations(Unknown Source)
>>     at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:255)
>>     at sun.reflect.GeneratedMethodAccessor38.invoke(Unknown Source)
>>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>     at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
>>     at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>>     at com.sun.proxy.$Proxy16.getBlockLocations(Unknown Source)
>>     at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1226)
>>
>>
>> On Wed, May 24, 2017 at 4:29 PM, Shixiong(Ryan) Zhu <
>> shixiong@databricks.com> wrote:
>>
>>> What's the value of "hdfsCheckPointDir"? Could you list this directory
>>> on HDFS and report the files there?
>>>
>>> On Wed, May 24, 2017 at 3:50 PM, Michael Armbrust <
>>> michael@databricks.com> wrote:
>>>
>>>> -dev
>>>>
>>>> Have you tried clearing out the checkpoint directory?  Can you also
>>>> give the full stack trace?
>>>>
>>>> On Wed, May 24, 2017 at 3:45 PM, kant kodali <ka...@gmail.com>
>>>> wrote:
>>>>
>>>>> Even if I do simple count aggregation like below I get the same error
>>>>> as https://issues.apache.org/jira/browse/SPARK-19268
>>>>>
>>>>> Dataset<Row> df2 = df1.groupBy(functions.window(df1.col("Timestamp5"), "24 hours", "24 hours"), df1.col("AppName")).count();
>>>>>
>>>>>
>>>>> On Wed, May 24, 2017 at 3:35 PM, kant kodali <ka...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> I am using Spark 2.1.1 and running in a Standalone mode using HDFS
>>>>>> and Kafka
>>>>>>
>>>>>> I am running into the same problem as https://issues.apache.org/jira
>>>>>> /browse/SPARK-19268 with my app(not KafkaWordCount).
>>>>>>
>>>>>> Here is my sample code
>>>>>>
>>>>>> *Here is how I create ReadStream*
>>>>>>
>>>>>> sparkSession.readStream()
>>>>>>                 .format("kafka")
>>>>>>                 .option("kafka.bootstrap.servers", config.getString("kafka.consumer.settings.bootstrapServers"))
>>>>>>                 .option("subscribe", config.getString("kafka.consumer.settings.topicName"))
>>>>>>                 .option("startingOffsets", "earliest")
>>>>>>                 .option("failOnDataLoss", "false")
>>>>>>                 .option("checkpointLocation", hdfsCheckPointDir)
>>>>>>                 .load();
>>>>>>
>>>>>>
>>>>>> *The core logic*
>>>>>>
>>>>>> Dataset<Row> df = ds.select(from_json(new Column("value").cast("string"), client.getSchema()).as("payload"));
>>>>>> Dataset<Row> df1 = df.selectExpr("payload.info.*", "payload.data.*");
>>>>>> Dataset<Row> df2 = df1.groupBy(window(df1.col("Timestamp5"), "24 hours", "24 hours"), df1.col("AppName")).agg(sum("Amount"));
>>>>>> StreamingQuery query = df1.writeStream().foreach(new KafkaSink()).outputMode("update").start();
>>>>>> query.awaitTermination();
>>>>>>
>>>>>>
>>>>>> I can also provide any other information you may need.
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Running into the same problem as JIRA SPARK-19268

Posted by "Shixiong(Ryan) Zhu" <sh...@databricks.com>.
I meant using HDFS command to check the directory. Such as "bin/hadoop fs
-ls /usr/local/hadoop/checkpoint". My hunch is the default file system in
driver probably is the local file system. Could you add the following line
into your code to print the default file system?

println(org.apache.hadoop.fs.FileSystem.get(sc.hadoopConfiguration).getClass)

On Wed, May 24, 2017 at 5:59 PM, kant kodali <ka...@gmail.com> wrote:

> Hi All,
>
> I specified hdfsCheckPointDir = /usr/local/hadoop/checkpoint as you can
> see below however I dont see checkpoint directory under my hadoop_home=
> /usr/local/hadoop in either datanodes or namenodes however in datanode
> machine there seems to be some data under
>
> /usr/local/hadoop/hdfs/namenode/current/BP-1469808024-X.X.X.X-
> 1495672725898/current/finalized/subdir0/subdir0
>
> I thought the checkpoint directory will be created by spark once I specify
> the path but do I manually need to create checkpoint dir using mkdir in all
> spark worker machines? I am new to HDFS as well so please let me know. I
> can try sending df.explain("true") but I have 100 fields in my schema so
> Project looks really big and if this is not a problem for you guys I can
> send that as well.
>
>
>   +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@21002393,kafka,List(),None,List(),None,Map(subscribe -> analytics2, failOnDataLoss -> false, kafka.bootstrap.servers -> X.X.X.X:9092 <http://172.31.41.30:9092>, checkpointLocation -> /usr/local/hadoop/checkpoint, startingOffsets -> earliest),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
>
> *Here is the stack trace*
>
> StructField(OptionalContext4,StringType,true), StructField(OptionalContext5,StringType,true)),true), cast(value#1 as string)) AS payload#15]
>             +- StreamingExecutionRelation KafkaSource[Subscribe[analytics2]], [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
>
>     at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:305)
>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:191)
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 3.0 failed 4 times, most recent failure: Lost task 2.3 in stage 3.0 (TID 222, 172.31.25.189, executor 0): java.lang.IllegalStateException: Error reading delta file /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta of HDFSStateStoreProvider[id = (op=0, part=2), dir = /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2]: /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta does not exist
>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:365)
>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:317)
>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:314)
>     at scala.Option.getOrElse(Option.scala:121)
>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:314)
>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:313)
>     at scala.Option.getOrElse(Option.scala:121)
>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:313)
>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvider.scala:220)
>     at org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:186)
>     at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:61)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>     at org.apache.spark.scheduler.Task.run(Task.scala:99)
>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.FileNotFoundException: File does not exist: /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta
>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1828)
>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1799)
>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1712)
>     at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:588)
>     at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365)
>     at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>     at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
>     at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
>     at java.security.AccessController.doPrivileged(Native Method)
>     at javax.security.auth.Subject.doAs(Subject.java:422)
>     at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
>     at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
>
>     at sun.reflect.GeneratedConstructorAccessor13.newInstance(Unknown Source)
>     at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>     at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>     at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
>     at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
>     at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1228)
>     at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1213)
>     at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1201)
>     at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:306)
>     at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:272)
>     at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:264)
>     at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1526)
>     at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304)
>     at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:299)
>     at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>     at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:312)
>     at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:362)
>     ... 21 more
> Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta
>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1828)
>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1799)
>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1712)
>     at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:588)
>     at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365)
>     at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>     at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
>     at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
>     at java.security.AccessController.doPrivileged(Native Method)
>     at javax.security.auth.Subject.doAs(Subject.java:422)
>     at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
>     at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
>
>     at org.apache.hadoop.ipc.Client.call(Client.java:1475)
>     at org.apache.hadoop.ipc.Client.call(Client.java:1412)
>     at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
>     at com.sun.proxy.$Proxy15.getBlockLocations(Unknown Source)
>     at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:255)
>     at sun.reflect.GeneratedMethodAccessor38.invoke(Unknown Source)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
>     at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>     at com.sun.proxy.$Proxy16.getBlockLocations(Unknown Source)
>     at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1226)
>     ... 33 more
>
> Driver stacktrace:
>     at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
>     at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>     at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
>     at scala.Option.foreach(Option.scala:257)
>     at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
>     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
>     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
>     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
>     at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>     at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925)
>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938)
>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951)
>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1965)
>     at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:926)
>     at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:924)
>     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>     at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>     at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:924)
>     at org.apache.spark.sql.execution.streaming.ForeachSink.addBatch(ForeachSink.scala:49)
>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply$mcV$sp(StreamExecution.scala:554)
>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:554)
>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:554)
>     at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:278)
>     at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:49)
>     at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:553)
>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply$mcV$sp(StreamExecution.scala:273)
>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:262)
>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:262)
>     at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:278)
>     at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:49)
>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:262)
>     at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43)
>     at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:257)
>     ... 1 more
> Caused by: java.lang.IllegalStateException: Error reading delta file /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta of HDFSStateStoreProvider[id = (op=0, part=2), dir = /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2]: /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta does not exist
>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:365)
>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:317)
>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:314)
>     at scala.Option.getOrElse(Option.scala:121)
>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:314)
>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:313)
>     at scala.Option.getOrElse(Option.scala:121)
>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:313)
>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvider.scala:220)
>     at org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:186)
>     at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:61)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>     at org.apache.spark.scheduler.Task.run(Task.scala:99)
>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.FileNotFoundException: File does not exist: /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta
>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1828)
>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1799)
>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1712)
>     at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:588)
>     at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365)
>     at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>     at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
>     at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
>     at java.security.AccessController.doPrivileged(Native Method)
>     at javax.security.auth.Subject.doAs(Subject.java:422)
>     at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
>     at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
>
>     at sun.reflect.GeneratedConstructorAccessor13.newInstance(Unknown Source)
>     at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>     at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>     at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
>     at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
>     at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1228)
>     at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1213)
>     at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1201)
>     at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:306)
>     at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:272)
>     at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:264)
>     at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1526)
>     at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304)
>     at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:299)
>     at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>     at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:312)
>     at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
>     at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:362)
>     ... 21 more
> Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: /tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta
>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
>     at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1828)
>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1799)
>     at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1712)
>     at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:588)
>     at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365)
>     at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>     at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
>     at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>     at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
>     at java.security.AccessController.doPrivileged(Native Method)
>     at javax.security.auth.Subject.doAs(Subject.java:422)
>     at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
>     at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
>
>     at org.apache.hadoop.ipc.Client.call(Client.java:1475)
>     at org.apache.hadoop.ipc.Client.call(Client.java:1412)
>     at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
>     at com.sun.proxy.$Proxy15.getBlockLocations(Unknown Source)
>     at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:255)
>     at sun.reflect.GeneratedMethodAccessor38.invoke(Unknown Source)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
>     at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>     at com.sun.proxy.$Proxy16.getBlockLocations(Unknown Source)
>     at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1226)
>
>
> On Wed, May 24, 2017 at 4:29 PM, Shixiong(Ryan) Zhu <
> shixiong@databricks.com> wrote:
>
>> What's the value of "hdfsCheckPointDir"? Could you list this directory on
>> HDFS and report the files there?
>>
>> On Wed, May 24, 2017 at 3:50 PM, Michael Armbrust <michael@databricks.com
>> > wrote:
>>
>>> -dev
>>>
>>> Have you tried clearing out the checkpoint directory?  Can you also give
>>> the full stack trace?
>>>
>>> On Wed, May 24, 2017 at 3:45 PM, kant kodali <ka...@gmail.com> wrote:
>>>
>>>> Even if I do simple count aggregation like below I get the same error
>>>> as https://issues.apache.org/jira/browse/SPARK-19268
>>>>
>>>> Dataset<Row> df2 = df1.groupBy(functions.window(df1.col("Timestamp5"), "24 hours", "24 hours"), df1.col("AppName")).count();
>>>>
>>>>
>>>> On Wed, May 24, 2017 at 3:35 PM, kant kodali <ka...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> I am using Spark 2.1.1 and running in a Standalone mode using HDFS and
>>>>> Kafka
>>>>>
>>>>> I am running into the same problem as https://issues.apache.org/jira
>>>>> /browse/SPARK-19268 with my app(not KafkaWordCount).
>>>>>
>>>>> Here is my sample code
>>>>>
>>>>> *Here is how I create ReadStream*
>>>>>
>>>>> sparkSession.readStream()
>>>>>                 .format("kafka")
>>>>>                 .option("kafka.bootstrap.servers", config.getString("kafka.consumer.settings.bootstrapServers"))
>>>>>                 .option("subscribe", config.getString("kafka.consumer.settings.topicName"))
>>>>>                 .option("startingOffsets", "earliest")
>>>>>                 .option("failOnDataLoss", "false")
>>>>>                 .option("checkpointLocation", hdfsCheckPointDir)
>>>>>                 .load();
>>>>>
>>>>>
>>>>> *The core logic*
>>>>>
>>>>> Dataset<Row> df = ds.select(from_json(new Column("value").cast("string"), client.getSchema()).as("payload"));
>>>>> Dataset<Row> df1 = df.selectExpr("payload.info.*", "payload.data.*");
>>>>> Dataset<Row> df2 = df1.groupBy(window(df1.col("Timestamp5"), "24 hours", "24 hours"), df1.col("AppName")).agg(sum("Amount"));
>>>>> StreamingQuery query = df1.writeStream().foreach(new KafkaSink()).outputMode("update").start();
>>>>> query.awaitTermination();
>>>>>
>>>>>
>>>>> I can also provide any other information you may need.
>>>>>
>>>>> Thanks!
>>>>>
>>>>
>>>>
>>>
>>
>

Re: Running into the same problem as JIRA SPARK-19268

Posted by kant kodali <ka...@gmail.com>.
Hi All,

I specified hdfsCheckPointDir = /usr/local/hadoop/checkpoint as you can see
below however I dont see checkpoint directory under my hadoop_home=
/usr/local/hadoop in either datanodes or namenodes however in datanode
machine there seems to be some data under

/usr/local/hadoop/hdfs/namenode/current/BP-1469808024-X.X.X.X-1495672725898/current/finalized/subdir0/subdir0

I thought the checkpoint directory will be created by spark once I specify
the path but do I manually need to create checkpoint dir using mkdir in all
spark worker machines? I am new to HDFS as well so please let me know. I
can try sending df.explain("true") but I have 100 fields in my schema so
Project looks really big and if this is not a problem for you guys I can
send that as well.


  +- StreamingRelation
DataSource(org.apache.spark.sql.SparkSession@21002393,kafka,List(),None,List(),None,Map(subscribe
-> analytics2, failOnDataLoss -> false, kafka.bootstrap.servers ->
X.X.X.X:9092 <http://172.31.41.30:9092>, checkpointLocation ->
/usr/local/hadoop/checkpoint, startingOffsets -> earliest),None),
kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5,
timestampType#6]

*Here is the stack trace*

StructField(OptionalContext4,StringType,true),
StructField(OptionalContext5,StringType,true)),true), cast(value#1 as
string)) AS payload#15]
            +- StreamingExecutionRelation
KafkaSource[Subscribe[analytics2]], [key#0, value#1, topic#2,
partition#3, offset#4L, timestamp#5, timestampType#6]

    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:305)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:191)
Caused by: org.apache.spark.SparkException: Job aborted due to stage
failure: Task 2 in stage 3.0 failed 4 times, most recent failure: Lost
task 2.3 in stage 3.0 (TID 222, 172.31.25.189, executor 0):
java.lang.IllegalStateException: Error reading delta file
/tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta
of HDFSStateStoreProvider[id = (op=0, part=2), dir =
/tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2]:
/tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta
does not exist
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:365)
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:317)
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:314)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:314)
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:313)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:313)
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvider.scala:220)
    at org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:186)
    at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:61)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.FileNotFoundException: File does not exist:
/tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta
    at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
    at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1828)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1799)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1712)
    at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:588)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365)
    at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)

    at sun.reflect.GeneratedConstructorAccessor13.newInstance(Unknown Source)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
    at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
    at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1228)
    at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1213)
    at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1201)
    at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:306)
    at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:272)
    at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:264)
    at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1526)
    at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304)
    at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:299)
    at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:312)
    at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:362)
    ... 21 more
Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException):
File does not exist:
/tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta
    at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
    at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1828)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1799)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1712)
    at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:588)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365)
    at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)

    at org.apache.hadoop.ipc.Client.call(Client.java:1475)
    at org.apache.hadoop.ipc.Client.call(Client.java:1412)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
    at com.sun.proxy.$Proxy15.getBlockLocations(Unknown Source)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:255)
    at sun.reflect.GeneratedMethodAccessor38.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
    at com.sun.proxy.$Proxy16.getBlockLocations(Unknown Source)
    at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1226)
    ... 33 more

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1965)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:926)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:924)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:924)
    at org.apache.spark.sql.execution.streaming.ForeachSink.addBatch(ForeachSink.scala:49)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply$mcV$sp(StreamExecution.scala:554)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:554)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:554)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:278)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:49)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:553)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply$mcV$sp(StreamExecution.scala:273)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:262)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:262)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:278)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:49)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:262)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:257)
    ... 1 more
Caused by: java.lang.IllegalStateException: Error reading delta file
/tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta
of HDFSStateStoreProvider[id = (op=0, part=2), dir =
/tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2]:
/tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta
does not exist
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:365)
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:317)
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:314)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:314)
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:313)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$loadMap(HDFSBackedStateStoreProvider.scala:313)
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvider.scala:220)
    at org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:186)
    at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:61)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.FileNotFoundException: File does not exist:
/tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta
    at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
    at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1828)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1799)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1712)
    at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:588)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365)
    at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)

    at sun.reflect.GeneratedConstructorAccessor13.newInstance(Unknown Source)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
    at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
    at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1228)
    at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1213)
    at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1201)
    at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:306)
    at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:272)
    at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:264)
    at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1526)
    at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:304)
    at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:299)
    at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:312)
    at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:362)
    ... 21 more
Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException):
File does not exist:
/tmp/temporary-b1516035-a0e6-4a37-9a47-19d55cb5d787/state/0/2/1.delta
    at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
    at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1828)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1799)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1712)
    at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:588)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365)
    at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)

    at org.apache.hadoop.ipc.Client.call(Client.java:1475)
    at org.apache.hadoop.ipc.Client.call(Client.java:1412)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
    at com.sun.proxy.$Proxy15.getBlockLocations(Unknown Source)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:255)
    at sun.reflect.GeneratedMethodAccessor38.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
    at com.sun.proxy.$Proxy16.getBlockLocations(Unknown Source)
    at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1226)


On Wed, May 24, 2017 at 4:29 PM, Shixiong(Ryan) Zhu <shixiong@databricks.com
> wrote:

> What's the value of "hdfsCheckPointDir"? Could you list this directory on
> HDFS and report the files there?
>
> On Wed, May 24, 2017 at 3:50 PM, Michael Armbrust <mi...@databricks.com>
> wrote:
>
>> -dev
>>
>> Have you tried clearing out the checkpoint directory?  Can you also give
>> the full stack trace?
>>
>> On Wed, May 24, 2017 at 3:45 PM, kant kodali <ka...@gmail.com> wrote:
>>
>>> Even if I do simple count aggregation like below I get the same error as
>>> https://issues.apache.org/jira/browse/SPARK-19268
>>>
>>> Dataset<Row> df2 = df1.groupBy(functions.window(df1.col("Timestamp5"), "24 hours", "24 hours"), df1.col("AppName")).count();
>>>
>>>
>>> On Wed, May 24, 2017 at 3:35 PM, kant kodali <ka...@gmail.com> wrote:
>>>
>>>> Hi All,
>>>>
>>>> I am using Spark 2.1.1 and running in a Standalone mode using HDFS and
>>>> Kafka
>>>>
>>>> I am running into the same problem as https://issues.apache.org/jira
>>>> /browse/SPARK-19268 with my app(not KafkaWordCount).
>>>>
>>>> Here is my sample code
>>>>
>>>> *Here is how I create ReadStream*
>>>>
>>>> sparkSession.readStream()
>>>>                 .format("kafka")
>>>>                 .option("kafka.bootstrap.servers", config.getString("kafka.consumer.settings.bootstrapServers"))
>>>>                 .option("subscribe", config.getString("kafka.consumer.settings.topicName"))
>>>>                 .option("startingOffsets", "earliest")
>>>>                 .option("failOnDataLoss", "false")
>>>>                 .option("checkpointLocation", hdfsCheckPointDir)
>>>>                 .load();
>>>>
>>>>
>>>> *The core logic*
>>>>
>>>> Dataset<Row> df = ds.select(from_json(new Column("value").cast("string"), client.getSchema()).as("payload"));
>>>> Dataset<Row> df1 = df.selectExpr("payload.info.*", "payload.data.*");
>>>> Dataset<Row> df2 = df1.groupBy(window(df1.col("Timestamp5"), "24 hours", "24 hours"), df1.col("AppName")).agg(sum("Amount"));
>>>> StreamingQuery query = df1.writeStream().foreach(new KafkaSink()).outputMode("update").start();
>>>> query.awaitTermination();
>>>>
>>>>
>>>> I can also provide any other information you may need.
>>>>
>>>> Thanks!
>>>>
>>>
>>>
>>
>

Re: Running into the same problem as JIRA SPARK-19268

Posted by "Shixiong(Ryan) Zhu" <sh...@databricks.com>.
What's the value of "hdfsCheckPointDir"? Could you list this directory on
HDFS and report the files there?

On Wed, May 24, 2017 at 3:50 PM, Michael Armbrust <mi...@databricks.com>
wrote:

> -dev
>
> Have you tried clearing out the checkpoint directory?  Can you also give
> the full stack trace?
>
> On Wed, May 24, 2017 at 3:45 PM, kant kodali <ka...@gmail.com> wrote:
>
>> Even if I do simple count aggregation like below I get the same error as
>> https://issues.apache.org/jira/browse/SPARK-19268
>>
>> Dataset<Row> df2 = df1.groupBy(functions.window(df1.col("Timestamp5"), "24 hours", "24 hours"), df1.col("AppName")).count();
>>
>>
>> On Wed, May 24, 2017 at 3:35 PM, kant kodali <ka...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> I am using Spark 2.1.1 and running in a Standalone mode using HDFS and
>>> Kafka
>>>
>>> I am running into the same problem as https://issues.apache.org/jira
>>> /browse/SPARK-19268 with my app(not KafkaWordCount).
>>>
>>> Here is my sample code
>>>
>>> *Here is how I create ReadStream*
>>>
>>> sparkSession.readStream()
>>>                 .format("kafka")
>>>                 .option("kafka.bootstrap.servers", config.getString("kafka.consumer.settings.bootstrapServers"))
>>>                 .option("subscribe", config.getString("kafka.consumer.settings.topicName"))
>>>                 .option("startingOffsets", "earliest")
>>>                 .option("failOnDataLoss", "false")
>>>                 .option("checkpointLocation", hdfsCheckPointDir)
>>>                 .load();
>>>
>>>
>>> *The core logic*
>>>
>>> Dataset<Row> df = ds.select(from_json(new Column("value").cast("string"), client.getSchema()).as("payload"));
>>> Dataset<Row> df1 = df.selectExpr("payload.info.*", "payload.data.*");
>>> Dataset<Row> df2 = df1.groupBy(window(df1.col("Timestamp5"), "24 hours", "24 hours"), df1.col("AppName")).agg(sum("Amount"));
>>> StreamingQuery query = df1.writeStream().foreach(new KafkaSink()).outputMode("update").start();
>>> query.awaitTermination();
>>>
>>>
>>> I can also provide any other information you may need.
>>>
>>> Thanks!
>>>
>>
>>
>

Re: Running into the same problem as JIRA SPARK-19268

Posted by Michael Armbrust <mi...@databricks.com>.
-dev

Have you tried clearing out the checkpoint directory?  Can you also give
the full stack trace?

On Wed, May 24, 2017 at 3:45 PM, kant kodali <ka...@gmail.com> wrote:

> Even if I do simple count aggregation like below I get the same error as
> https://issues.apache.org/jira/browse/SPARK-19268
>
> Dataset<Row> df2 = df1.groupBy(functions.window(df1.col("Timestamp5"), "24 hours", "24 hours"), df1.col("AppName")).count();
>
>
> On Wed, May 24, 2017 at 3:35 PM, kant kodali <ka...@gmail.com> wrote:
>
>> Hi All,
>>
>> I am using Spark 2.1.1 and running in a Standalone mode using HDFS and
>> Kafka
>>
>> I am running into the same problem as https://issues.apache.org/jira
>> /browse/SPARK-19268 with my app(not KafkaWordCount).
>>
>> Here is my sample code
>>
>> *Here is how I create ReadStream*
>>
>> sparkSession.readStream()
>>                 .format("kafka")
>>                 .option("kafka.bootstrap.servers", config.getString("kafka.consumer.settings.bootstrapServers"))
>>                 .option("subscribe", config.getString("kafka.consumer.settings.topicName"))
>>                 .option("startingOffsets", "earliest")
>>                 .option("failOnDataLoss", "false")
>>                 .option("checkpointLocation", hdfsCheckPointDir)
>>                 .load();
>>
>>
>> *The core logic*
>>
>> Dataset<Row> df = ds.select(from_json(new Column("value").cast("string"), client.getSchema()).as("payload"));
>> Dataset<Row> df1 = df.selectExpr("payload.info.*", "payload.data.*");
>> Dataset<Row> df2 = df1.groupBy(window(df1.col("Timestamp5"), "24 hours", "24 hours"), df1.col("AppName")).agg(sum("Amount"));
>> StreamingQuery query = df1.writeStream().foreach(new KafkaSink()).outputMode("update").start();
>> query.awaitTermination();
>>
>>
>> I can also provide any other information you may need.
>>
>> Thanks!
>>
>
>

Re: Running into the same problem as JIRA SPARK-19268

Posted by Michael Armbrust <mi...@databricks.com>.
-dev

Have you tried clearing out the checkpoint directory?  Can you also give
the full stack trace?

On Wed, May 24, 2017 at 3:45 PM, kant kodali <ka...@gmail.com> wrote:

> Even if I do simple count aggregation like below I get the same error as
> https://issues.apache.org/jira/browse/SPARK-19268
>
> Dataset<Row> df2 = df1.groupBy(functions.window(df1.col("Timestamp5"), "24 hours", "24 hours"), df1.col("AppName")).count();
>
>
> On Wed, May 24, 2017 at 3:35 PM, kant kodali <ka...@gmail.com> wrote:
>
>> Hi All,
>>
>> I am using Spark 2.1.1 and running in a Standalone mode using HDFS and
>> Kafka
>>
>> I am running into the same problem as https://issues.apache.org/jira
>> /browse/SPARK-19268 with my app(not KafkaWordCount).
>>
>> Here is my sample code
>>
>> *Here is how I create ReadStream*
>>
>> sparkSession.readStream()
>>                 .format("kafka")
>>                 .option("kafka.bootstrap.servers", config.getString("kafka.consumer.settings.bootstrapServers"))
>>                 .option("subscribe", config.getString("kafka.consumer.settings.topicName"))
>>                 .option("startingOffsets", "earliest")
>>                 .option("failOnDataLoss", "false")
>>                 .option("checkpointLocation", hdfsCheckPointDir)
>>                 .load();
>>
>>
>> *The core logic*
>>
>> Dataset<Row> df = ds.select(from_json(new Column("value").cast("string"), client.getSchema()).as("payload"));
>> Dataset<Row> df1 = df.selectExpr("payload.info.*", "payload.data.*");
>> Dataset<Row> df2 = df1.groupBy(window(df1.col("Timestamp5"), "24 hours", "24 hours"), df1.col("AppName")).agg(sum("Amount"));
>> StreamingQuery query = df1.writeStream().foreach(new KafkaSink()).outputMode("update").start();
>> query.awaitTermination();
>>
>>
>> I can also provide any other information you may need.
>>
>> Thanks!
>>
>
>

Re: Running into the same problem as JIRA SPARK-19268

Posted by kant kodali <ka...@gmail.com>.
Even if I do simple count aggregation like below I get the same error as
https://issues.apache.org/jira/browse/SPARK-19268

Dataset<Row> df2 = df1.groupBy(functions.window(df1.col("Timestamp5"),
"24 hours", "24 hours"), df1.col("AppName")).count();


On Wed, May 24, 2017 at 3:35 PM, kant kodali <ka...@gmail.com> wrote:

> Hi All,
>
> I am using Spark 2.1.1 and running in a Standalone mode using HDFS and
> Kafka
>
> I am running into the same problem as https://issues.apache.org/
> jira/browse/SPARK-19268 with my app(not KafkaWordCount).
>
> Here is my sample code
>
> *Here is how I create ReadStream*
>
> sparkSession.readStream()
>                 .format("kafka")
>                 .option("kafka.bootstrap.servers", config.getString("kafka.consumer.settings.bootstrapServers"))
>                 .option("subscribe", config.getString("kafka.consumer.settings.topicName"))
>                 .option("startingOffsets", "earliest")
>                 .option("failOnDataLoss", "false")
>                 .option("checkpointLocation", hdfsCheckPointDir)
>                 .load();
>
>
> *The core logic*
>
> Dataset<Row> df = ds.select(from_json(new Column("value").cast("string"), client.getSchema()).as("payload"));
> Dataset<Row> df1 = df.selectExpr("payload.info.*", "payload.data.*");
> Dataset<Row> df2 = df1.groupBy(window(df1.col("Timestamp5"), "24 hours", "24 hours"), df1.col("AppName")).agg(sum("Amount"));
> StreamingQuery query = df1.writeStream().foreach(new KafkaSink()).outputMode("update").start();
> query.awaitTermination();
>
>
> I can also provide any other information you may need.
>
> Thanks!
>

Re: Running into the same problem as JIRA SPARK-19268

Posted by kant kodali <ka...@gmail.com>.
Even if I do simple count aggregation like below I get the same error as
https://issues.apache.org/jira/browse/SPARK-19268

Dataset<Row> df2 = df1.groupBy(functions.window(df1.col("Timestamp5"),
"24 hours", "24 hours"), df1.col("AppName")).count();


On Wed, May 24, 2017 at 3:35 PM, kant kodali <ka...@gmail.com> wrote:

> Hi All,
>
> I am using Spark 2.1.1 and running in a Standalone mode using HDFS and
> Kafka
>
> I am running into the same problem as https://issues.apache.org/
> jira/browse/SPARK-19268 with my app(not KafkaWordCount).
>
> Here is my sample code
>
> *Here is how I create ReadStream*
>
> sparkSession.readStream()
>                 .format("kafka")
>                 .option("kafka.bootstrap.servers", config.getString("kafka.consumer.settings.bootstrapServers"))
>                 .option("subscribe", config.getString("kafka.consumer.settings.topicName"))
>                 .option("startingOffsets", "earliest")
>                 .option("failOnDataLoss", "false")
>                 .option("checkpointLocation", hdfsCheckPointDir)
>                 .load();
>
>
> *The core logic*
>
> Dataset<Row> df = ds.select(from_json(new Column("value").cast("string"), client.getSchema()).as("payload"));
> Dataset<Row> df1 = df.selectExpr("payload.info.*", "payload.data.*");
> Dataset<Row> df2 = df1.groupBy(window(df1.col("Timestamp5"), "24 hours", "24 hours"), df1.col("AppName")).agg(sum("Amount"));
> StreamingQuery query = df1.writeStream().foreach(new KafkaSink()).outputMode("update").start();
> query.awaitTermination();
>
>
> I can also provide any other information you may need.
>
> Thanks!
>