You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by N B <nb...@gmail.com> on 2017/06/20 01:23:15 UTC

Flume DStream produces 0 records after HDFS node killed

Hi all,

We are running a Standalone Spark Cluster for running a streaming
application. The application consumes data from Flume using a Flume Polling
stream created as such :

flumeStream = FlumeUtils.createPollingStream(streamingContext,
    socketAddress.toArray(new InetSocketAddress[socketAddress.size()]),
    StorageLevel.MEMORY_AND_DISK_SER(), *100*, *5*);


The checkpoint directory is configured to be on an HDFS cluster and Spark
workers have their SPARK_LOCAL_DIRS and SPARK_WORKER_DIR defined to be on
their respective local filesystems.

What we are seeing is some odd behavior and unable to explain. During
normal operation, everything runs as expected with flume delivering events
to Spark. However, while running, if I kill one of the HDFS nodes (does not
matter which one), the Flume Receiver in Spark stops producing any data to
the data processing.

I enabled debug logging for org.apache.spark.streaming.flume on Spark
worker nodes and looked at the logs for the one that gets to run the Flume
Receiver and it keeps chugging along receiving data from Flume as shown in
a sample of the log below, but the resulting batches in the Stream start
receiving 0 records soon as the HDFS node is killed, with no errors being
produced to indicate any issue.

*17/06/20 01:05:42 DEBUG FlumeBatchFetcher: Ack sent for sequence number:
09fa05f59050*
*17/06/20 01:05:44 DEBUG FlumeBatchFetcher: Received batch of 100 events
with sequence number: 09fa05f59052*
*17/06/20 01:05:44 DEBUG FlumeBatchFetcher: Sending ack for sequence
number: 09fa05f59052*
*17/06/20 01:05:44 DEBUG FlumeBatchFetcher: Ack sent for sequence number:
09fa05f59052*
*17/06/20 01:05:47 DEBUG FlumeBatchFetcher: Received batch of 100 events
with sequence number: 09fa05f59054*
*17/06/20 01:05:47 DEBUG FlumeBatchFetcher: Sending ack for sequence
number: 09fa05f59054*

The driver output for the application shows (printed via
Dstream.count().map().print()):

-------------------------------------------
Time: 1497920770000 ms
-------------------------------------------
Received 0     flume events.


Any insights about where to look in order to find the root cause will be
greatly appreciated.

Thanks
N B

Re: Flume DStream produces 0 records after HDFS node killed

Posted by N B <nb...@gmail.com>.
This issue got resolved.

I was able to trace it to the fact that the driver program's pom.xml was
pulling in Spark 2.1.1 which in turn was pulling in Hadoop 2.2.0.
Explicitly adding dependencies on Hadoop libraries 2.7.3 resolves it.

The following API in HDFS : DatanodeManager.getDatanodeStorageInfos() is
incompatible between clients using HDFS Client 2.2.0 and HDFS Server
running some later versions. I believe this issue should be raised with the
HDFS project, since HDFS 2.2.0 APIs were supposed to have been kept API and
Protocol compatible according to the Hadoop 2.2.0 release blurb:

From http://hadoop.apache.org/releases.html :

"
15 October, 2013: Release 2.2.0 available

Apache Hadoop 2.2.0 is the *GA* release of Apache Hadoop 2.x.

Users are encouraged to immediately move to 2.2.0 since this release is
significantly more stable and is guaranteed to remain compatible in terms
of both APIs and protocols.
"

Thanks
N B


On Tue, Jun 20, 2017 at 11:36 PM, N B <nb...@gmail.com> wrote:

> Hadoop version 2.7.3
>
> On Tue, Jun 20, 2017 at 11:12 PM, yohann jardin <yo...@hotmail.com>
> wrote:
>
>> Which version of Hadoop are you running on?
>>
>> *Yohann Jardin*
>> Le 6/21/2017 à 1:06 AM, N B a écrit :
>>
>> Ok some more info about this issue to see if someone can shine a light on
>> what could be going on. I turned on debug logging for
>> org.apache.spark.streaming.scheduler in the driver process and this is
>> what gets thrown in the logs and keeps throwing it even after the downed
>> HDFS node is restarted. Using Spark 2.1.1 and HDFS 2.7.3 here.
>>
>> 2017-06-20 22:38:11,302 WARN JobGenerator ReceivedBlockTracker.logWarning
>> - Exception thrown while writing record: BatchCleanupEvent(ArrayBuffer())
>> to the WriteAheadLog.
>> org.apache.spark.SparkException: Exception thrown in awaitResult:
>> at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194)
>> at org.apache.spark.streaming.util.BatchedWriteAheadLog.write(
>> BatchedWriteAheadLog.scala:83)
>> at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.wr
>> iteToLog(ReceivedBlockTracker.scala:234)
>> at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.cl
>> eanupOldBatches(ReceivedBlockTracker.scala:171)
>> at org.apache.spark.streaming.scheduler.ReceiverTracker.cleanup
>> OldBlocksAndBatches(ReceiverTracker.scala:233)
>> at org.apache.spark.streaming.scheduler.JobGenerator.clearCheck
>> pointData(JobGenerator.scala:287)
>> at org.apache.spark.streaming.scheduler.JobGenerator.org$apache
>> $spark$streaming$scheduler$JobGenerator$$processEvent(
>> JobGenerator.scala:187)
>> at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.
>> onReceive(JobGenerator.scala:89)
>> at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.
>> onReceive(JobGenerator.scala:88)
>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>> Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.HadoopIllegalArgumentException):
>> Missing storageIDs: It is likely that the HDFS client, who made this call,
>> is running in an older version of Hadoop which does not support storageIDs.
>> datanodeID.length=1, src=/vm/spark-checkpoint/recei
>> vedBlockMetadata/log-1497997390799-1497997450799, fileId=0,
>> blk=BP-1450953312-10.0.0.9-1490120290209:blk_1081472520_7731872,
>> clientName=DFSClient_NONMAPREDUCE_-23097586_1, clientMachine=10.0.0.17
>> at org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManage
>> r.getDatanodeStorageInfos(DatanodeManager.java:514)
>> at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAddit
>> ionalDatanode(FSNamesystem.java:3353)
>> at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.get
>> AdditionalDatanode(NameNodeRpcServer.java:759)
>> at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServ
>> erSideTranslatorPB.getAdditionalDatanode(ClientNamenodeProto
>> colServerSideTranslatorPB.java:515)
>> at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocol
>> Protos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNam
>> enodeProtocolProtos.java)
>> at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcIn
>> voker.call(ProtobufRpcEngine.java:616)
>> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> at org.apache.hadoop.security.UserGroupInformation.doAs(UserGro
>> upInformation.java:1698)
>> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
>>
>> at org.apache.hadoop.ipc.Client.call(Client.java:1347)
>> at org.apache.hadoop.ipc.Client.call(Client.java:1300)
>> at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(
>> ProtobufRpcEngine.java:206)
>> at com.sun.proxy.$Proxy10.getAdditionalDatanode(Unknown Source)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>> ssorImpl.java:62)
>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>> thodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMeth
>> od(RetryInvocationHandler.java:186)
>> at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(Ret
>> ryInvocationHandler.java:102)
>> at com.sun.proxy.$Proxy10.getAdditionalDatanode(Unknown Source)
>> at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.
>> getAdditionalDatanode(ClientNamenodeProtocolTranslatorPB.java:352)
>> at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatan
>> ode2ExistingPipeline(DFSOutputStream.java:919)
>> at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPip
>> elineForAppendOrRecovery(DFSOutputStream.java:1031)
>> at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processD
>> atanodeError(DFSOutputStream.java:823)
>> at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(
>> DFSOutputStream.java:475)
>> polygraph-engine 2017-06-20 22:38:11,302 WARN JobGenerator
>> ReceivedBlockTracker.logWarning - Failed to acknowledge batch clean up
>> in the Write Ahead Log.
>>
>> Thanks
>> N B
>>
>>
>> On Tue, Jun 20, 2017 at 10:24 AM, N B <nb...@gmail.com> wrote:
>>
>>> BTW, this is running on Spark 2.1.1.
>>>
>>> I have been trying to debug this issue and what I have found till now is
>>> that it is somehow related to the Spark WAL. The directory named
>>> <checkpoint dir in HDFS>/receivedBlockMetadata seems to stop getting
>>> written to after the point of an HDFS node being killed and restarted. I
>>> have a couple questions around this :
>>>
>>> 1. Why is the flume receiver even writing to the WAL? I have not enabled
>>> the 'spark.streaming.receiver.writeAheadLog.enable' property and even
>>> after I set it explicitly to false in the driver, the WAL seems to be
>>> getting written to.
>>>
>>> 2. Why would the receive receive metadata but not write to the WAL after
>>> an HDFS node is lost and restarted? HDFS replication factor is at its
>>> default of 2.
>>>
>>> Thanks
>>> N B
>>>
>>>
>>> On Mon, Jun 19, 2017 at 6:23 PM, N B <nb...@gmail.com> wrote:
>>>
>>>> Hi all,
>>>>
>>>> We are running a Standalone Spark Cluster for running a streaming
>>>> application. The application consumes data from Flume using a Flume Polling
>>>> stream created as such :
>>>>
>>>> flumeStream = FlumeUtils.createPollingStream(streamingContext,
>>>>     socketAddress.toArray(new InetSocketAddress[socketAddress.size()]),
>>>>     StorageLevel.MEMORY_AND_DISK_SER(), *100*, *5*);
>>>>
>>>> The checkpoint directory is configured to be on an HDFS cluster and
>>>> Spark workers have their SPARK_LOCAL_DIRS and SPARK_WORKER_DIR defined to
>>>> be on their respective local filesystems.
>>>>
>>>> What we are seeing is some odd behavior and unable to explain. During
>>>> normal operation, everything runs as expected with flume delivering events
>>>> to Spark. However, while running, if I kill one of the HDFS nodes (does not
>>>> matter which one), the Flume Receiver in Spark stops producing any data to
>>>> the data processing.
>>>>
>>>> I enabled debug logging for org.apache.spark.streaming.flume on Spark
>>>> worker nodes and looked at the logs for the one that gets to run the Flume
>>>> Receiver and it keeps chugging along receiving data from Flume as shown in
>>>> a sample of the log below, but the resulting batches in the Stream start
>>>> receiving 0 records soon as the HDFS node is killed, with no errors being
>>>> produced to indicate any issue.
>>>>
>>>> *17/06/20 01:05:42 DEBUG FlumeBatchFetcher: Ack sent for sequence
>>>> number: 09fa05f59050*
>>>> *17/06/20 01:05:44 DEBUG FlumeBatchFetcher: Received batch of 100
>>>> events with sequence number: 09fa05f59052*
>>>> *17/06/20 01:05:44 DEBUG FlumeBatchFetcher: Sending ack for sequence
>>>> number: 09fa05f59052*
>>>> *17/06/20 01:05:44 DEBUG FlumeBatchFetcher: Ack sent for sequence
>>>> number: 09fa05f59052*
>>>> *17/06/20 01:05:47 DEBUG FlumeBatchFetcher: Received batch of 100
>>>> events with sequence number: 09fa05f59054*
>>>> *17/06/20 01:05:47 DEBUG FlumeBatchFetcher: Sending ack for sequence
>>>> number: 09fa05f59054*
>>>>
>>>> The driver output for the application shows (printed via
>>>> Dstream.count().map().print()):
>>>>
>>>> -------------------------------------------
>>>> Time: 1497920770000 ms
>>>> -------------------------------------------
>>>> Received 0     flume events.
>>>>
>>>>
>>>> Any insights about where to look in order to find the root cause will
>>>> be greatly appreciated.
>>>>
>>>> Thanks
>>>> N B
>>>>
>>>>
>>>
>>
>>
>

Re: Flume DStream produces 0 records after HDFS node killed

Posted by N B <nb...@gmail.com>.
Hadoop version 2.7.3

On Tue, Jun 20, 2017 at 11:12 PM, yohann jardin <yo...@hotmail.com>
wrote:

> Which version of Hadoop are you running on?
>
> *Yohann Jardin*
> Le 6/21/2017 à 1:06 AM, N B a écrit :
>
> Ok some more info about this issue to see if someone can shine a light on
> what could be going on. I turned on debug logging for
> org.apache.spark.streaming.scheduler in the driver process and this is
> what gets thrown in the logs and keeps throwing it even after the downed
> HDFS node is restarted. Using Spark 2.1.1 and HDFS 2.7.3 here.
>
> 2017-06-20 22:38:11,302 WARN JobGenerator ReceivedBlockTracker.logWarning
> - Exception thrown while writing record: BatchCleanupEvent(ArrayBuffer())
> to the WriteAheadLog.
> org.apache.spark.SparkException: Exception thrown in awaitResult:
> at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194)
> at org.apache.spark.streaming.util.BatchedWriteAheadLog.
> write(BatchedWriteAheadLog.scala:83)
> at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(
> ReceivedBlockTracker.scala:234)
> at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.
> cleanupOldBatches(ReceivedBlockTracker.scala:171)
> at org.apache.spark.streaming.scheduler.ReceiverTracker.
> cleanupOldBlocksAndBatches(ReceiverTracker.scala:233)
> at org.apache.spark.streaming.scheduler.JobGenerator.clearCheckpointData(
> JobGenerator.scala:287)
> at org.apache.spark.streaming.scheduler.JobGenerator.org$
> apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.
> scala:187)
> at org.apache.spark.streaming.scheduler.JobGenerator$$anon$
> 1.onReceive(JobGenerator.scala:89)
> at org.apache.spark.streaming.scheduler.JobGenerator$$anon$
> 1.onReceive(JobGenerator.scala:88)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.
> HadoopIllegalArgumentException): Missing storageIDs: It is likely that
> the HDFS client, who made this call, is running in an older version of
> Hadoop which does not support storageIDs. datanodeID.length=1,
> src=/vm/spark-checkpoint/receivedBlockMetadata/log-1497997390799-1497997450799,
> fileId=0, blk=BP-1450953312-10.0.0.9-1490120290209:blk_1081472520_7731872,
> clientName=DFSClient_NONMAPREDUCE_-23097586_1, clientMachine=10.0.0.17
> at org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager.
> getDatanodeStorageInfos(DatanodeManager.java:514)
> at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> getAdditionalDatanode(FSNamesystem.java:3353)
> at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.
> getAdditionalDatanode(NameNodeRpcServer.java:759)
> at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSi
> deTranslatorPB.getAdditionalDatanode(ClientNamenodeProtocolServerSi
> deTranslatorPB.java:515)
> at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$
> ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.
> java)
> at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(
> ProtobufRpcEngine.java:616)
> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at org.apache.hadoop.security.UserGroupInformation.doAs(
> UserGroupInformation.java:1698)
> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
>
> at org.apache.hadoop.ipc.Client.call(Client.java:1347)
> at org.apache.hadoop.ipc.Client.call(Client.java:1300)
> at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.
> invoke(ProtobufRpcEngine.java:206)
> at com.sun.proxy.$Proxy10.getAdditionalDatanode(Unknown Source)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(
> RetryInvocationHandler.java:186)
> at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(
> RetryInvocationHandler.java:102)
> at com.sun.proxy.$Proxy10.getAdditionalDatanode(Unknown Source)
> at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslat
> orPB.getAdditionalDatanode(ClientNamenodeProtocolTranslatorPB.java:352)
> at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.
> addDatanode2ExistingPipeline(DFSOutputStream.java:919)
> at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.
> setupPipelineForAppendOrRecovery(DFSOutputStream.java:1031)
> at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.
> processDatanodeError(DFSOutputStream.java:823)
> at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.
> run(DFSOutputStream.java:475)
> polygraph-engine 2017-06-20 22:38:11,302 WARN JobGenerator
> ReceivedBlockTracker.logWarning - Failed to acknowledge batch clean up in
> the Write Ahead Log.
>
> Thanks
> N B
>
>
> On Tue, Jun 20, 2017 at 10:24 AM, N B <nb...@gmail.com> wrote:
>
>> BTW, this is running on Spark 2.1.1.
>>
>> I have been trying to debug this issue and what I have found till now is
>> that it is somehow related to the Spark WAL. The directory named
>> <checkpoint dir in HDFS>/receivedBlockMetadata seems to stop getting
>> written to after the point of an HDFS node being killed and restarted. I
>> have a couple questions around this :
>>
>> 1. Why is the flume receiver even writing to the WAL? I have not enabled
>> the 'spark.streaming.receiver.writeAheadLog.enable' property and even
>> after I set it explicitly to false in the driver, the WAL seems to be
>> getting written to.
>>
>> 2. Why would the receive receive metadata but not write to the WAL after
>> an HDFS node is lost and restarted? HDFS replication factor is at its
>> default of 2.
>>
>> Thanks
>> N B
>>
>>
>> On Mon, Jun 19, 2017 at 6:23 PM, N B <nb...@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> We are running a Standalone Spark Cluster for running a streaming
>>> application. The application consumes data from Flume using a Flume Polling
>>> stream created as such :
>>>
>>> flumeStream = FlumeUtils.createPollingStream(streamingContext,
>>>     socketAddress.toArray(new InetSocketAddress[socketAddress.size()]),
>>>     StorageLevel.MEMORY_AND_DISK_SER(), *100*, *5*);
>>>
>>> The checkpoint directory is configured to be on an HDFS cluster and
>>> Spark workers have their SPARK_LOCAL_DIRS and SPARK_WORKER_DIR defined to
>>> be on their respective local filesystems.
>>>
>>> What we are seeing is some odd behavior and unable to explain. During
>>> normal operation, everything runs as expected with flume delivering events
>>> to Spark. However, while running, if I kill one of the HDFS nodes (does not
>>> matter which one), the Flume Receiver in Spark stops producing any data to
>>> the data processing.
>>>
>>> I enabled debug logging for org.apache.spark.streaming.flume on Spark
>>> worker nodes and looked at the logs for the one that gets to run the Flume
>>> Receiver and it keeps chugging along receiving data from Flume as shown in
>>> a sample of the log below, but the resulting batches in the Stream start
>>> receiving 0 records soon as the HDFS node is killed, with no errors being
>>> produced to indicate any issue.
>>>
>>> *17/06/20 01:05:42 DEBUG FlumeBatchFetcher: Ack sent for sequence
>>> number: 09fa05f59050*
>>> *17/06/20 01:05:44 DEBUG FlumeBatchFetcher: Received batch of 100 events
>>> with sequence number: 09fa05f59052*
>>> *17/06/20 01:05:44 DEBUG FlumeBatchFetcher: Sending ack for sequence
>>> number: 09fa05f59052*
>>> *17/06/20 01:05:44 DEBUG FlumeBatchFetcher: Ack sent for sequence
>>> number: 09fa05f59052*
>>> *17/06/20 01:05:47 DEBUG FlumeBatchFetcher: Received batch of 100 events
>>> with sequence number: 09fa05f59054*
>>> *17/06/20 01:05:47 DEBUG FlumeBatchFetcher: Sending ack for sequence
>>> number: 09fa05f59054*
>>>
>>> The driver output for the application shows (printed via
>>> Dstream.count().map().print()):
>>>
>>> -------------------------------------------
>>> Time: 1497920770000 ms
>>> -------------------------------------------
>>> Received 0     flume events.
>>>
>>>
>>> Any insights about where to look in order to find the root cause will be
>>> greatly appreciated.
>>>
>>> Thanks
>>> N B
>>>
>>>
>>
>
>

Re: Flume DStream produces 0 records after HDFS node killed

Posted by yohann jardin <yo...@hotmail.com>.
Which version of Hadoop are you running on?

Yohann Jardin

Le 6/21/2017 à 1:06 AM, N B a écrit :
Ok some more info about this issue to see if someone can shine a light on what could be going on. I turned on debug logging for org.apache.spark.streaming.scheduler in the driver process and this is what gets thrown in the logs and keeps throwing it even after the downed HDFS node is restarted. Using Spark 2.1.1 and HDFS 2.7.3 here.

2017-06-20 22:38:11,302 WARN JobGenerator ReceivedBlockTracker.logWarning - Exception thrown while writing record: BatchCleanupEvent(ArrayBuffer()) to the WriteAheadLog.
org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194)
at org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:83)
at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:234)
at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.cleanupOldBatches(ReceivedBlockTracker.scala:171)
at org.apache.spark.streaming.scheduler.ReceiverTracker.cleanupOldBlocksAndBatches(ReceiverTracker.scala:233)
at org.apache.spark.streaming.scheduler.JobGenerator.clearCheckpointData(JobGenerator.scala:287)
at org.apache.spark.streaming.scheduler.JobGenerator.org<http://org.apache.spark.streaming.scheduler.JobGenerator.org>$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:187)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.HadoopIllegalArgumentException): Missing storageIDs: It is likely that the HDFS client, who made this call, is running in an older version of Hadoop which does not support storageIDs. datanodeID.length=1, src=/vm/spark-checkpoint/receivedBlockMetadata/log-1497997390799-1497997450799, fileId=0, blk=BP-1450953312-10.0.0.9-1490120290209:blk_1081472520_7731872, clientName=DFSClient_NONMAPREDUCE_-23097586_1, clientMachine=10.0.0.17
at org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager.getDatanodeStorageInfos(DatanodeManager.java:514)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalDatanode(FSNamesystem.java:3353)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getAdditionalDatanode(NameNodeRpcServer.java:759)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getAdditionalDatanode(ClientNamenodeProtocolServerSideTranslatorPB.java:515)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)

at org.apache.hadoop.ipc.Client.call(Client.java:1347)
at org.apache.hadoop.ipc.Client.call(Client.java:1300)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
at com.sun.proxy.$Proxy10.getAdditionalDatanode(Unknown Source)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy10.getAdditionalDatanode(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getAdditionalDatanode(ClientNamenodeProtocolTranslatorPB.java:352)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:919)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1031)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:823)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:475)
polygraph-engine 2017-06-20 22:38:11,302 WARN JobGenerator ReceivedBlockTracker.logWarning - Failed to acknowledge batch clean up in the Write Ahead Log.

Thanks
N B


On Tue, Jun 20, 2017 at 10:24 AM, N B <nb...@gmail.com>> wrote:
BTW, this is running on Spark 2.1.1.

I have been trying to debug this issue and what I have found till now is that it is somehow related to the Spark WAL. The directory named <checkpoint dir in HDFS>/receivedBlockMetadata seems to stop getting written to after the point of an HDFS node being killed and restarted. I have a couple questions around this :

1. Why is the flume receiver even writing to the WAL? I have not enabled the 'spark.streaming.receiver.writeAheadLog.enable' property and even after I set it explicitly to false in the driver, the WAL seems to be getting written to.

2. Why would the receive receive metadata but not write to the WAL after an HDFS node is lost and restarted? HDFS replication factor is at its default of 2.

Thanks
N B


On Mon, Jun 19, 2017 at 6:23 PM, N B <nb...@gmail.com>> wrote:
Hi all,

We are running a Standalone Spark Cluster for running a streaming application. The application consumes data from Flume using a Flume Polling stream created as such :


flumeStream = FlumeUtils.createPollingStream(streamingContext,
    socketAddress.toArray(new InetSocketAddress[socketAddress.size()]),
    StorageLevel.MEMORY_AND_DISK_SER(), 100, 5);




The checkpoint directory is configured to be on an HDFS cluster and Spark workers have their SPARK_LOCAL_DIRS and SPARK_WORKER_DIR defined to be on their respective local filesystems.

What we are seeing is some odd behavior and unable to explain. During normal operation, everything runs as expected with flume delivering events to Spark. However, while running, if I kill one of the HDFS nodes (does not matter which one), the Flume Receiver in Spark stops producing any data to the data processing.

I enabled debug logging for org.apache.spark.streaming.flume on Spark worker nodes and looked at the logs for the one that gets to run the Flume Receiver and it keeps chugging along receiving data from Flume as shown in a sample of the log below, but the resulting batches in the Stream start receiving 0 records soon as the HDFS node is killed, with no errors being produced to indicate any issue.

17/06/20 01:05:42 DEBUG FlumeBatchFetcher: Ack sent for sequence number: 09fa05f59050
17/06/20 01:05:44 DEBUG FlumeBatchFetcher: Received batch of 100 events with sequence number: 09fa05f59052
17/06/20 01:05:44 DEBUG FlumeBatchFetcher: Sending ack for sequence number: 09fa05f59052
17/06/20 01:05:44 DEBUG FlumeBatchFetcher: Ack sent for sequence number: 09fa05f59052
17/06/20 01:05:47 DEBUG FlumeBatchFetcher: Received batch of 100 events with sequence number: 09fa05f59054
17/06/20 01:05:47 DEBUG FlumeBatchFetcher: Sending ack for sequence number: 09fa05f59054

The driver output for the application shows (printed via Dstream.count().map().print()):

-------------------------------------------
Time: 1497920770000 ms
-------------------------------------------
Received 0     flume events.


Any insights about where to look in order to find the root cause will be greatly appreciated.

Thanks
N B





Re: Flume DStream produces 0 records after HDFS node killed

Posted by N B <nb...@gmail.com>.
Ok some more info about this issue to see if someone can shine a light on
what could be going on. I turned on debug logging for
org.apache.spark.streaming.scheduler in the driver process and this is what
gets thrown in the logs and keeps throwing it even after the downed HDFS
node is restarted. Using Spark 2.1.1 and HDFS 2.7.3 here.

2017-06-20 22:38:11,302 WARN JobGenerator ReceivedBlockTracker.logWarning -
Exception thrown while writing record: BatchCleanupEvent(ArrayBuffer()) to
the WriteAheadLog.
org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194)
at
org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:83)
at
org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:234)
at
org.apache.spark.streaming.scheduler.ReceivedBlockTracker.cleanupOldBatches(ReceivedBlockTracker.scala:171)
at
org.apache.spark.streaming.scheduler.ReceiverTracker.cleanupOldBlocksAndBatches(ReceiverTracker.scala:233)
at
org.apache.spark.streaming.scheduler.JobGenerator.clearCheckpointData(JobGenerator.scala:287)
at org.apache.spark.streaming.scheduler.JobGenerator.org
$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:187)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Caused by:
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.HadoopIllegalArgumentException):
Missing storageIDs: It is likely that the HDFS client, who made this call,
is running in an older version of Hadoop which does not support storageIDs.
datanodeID.length=1,
src=/vm/spark-checkpoint/receivedBlockMetadata/log-1497997390799-1497997450799,
fileId=0, blk=BP-1450953312-10.0.0.9-1490120290209:blk_1081472520_7731872,
clientName=DFSClient_NONMAPREDUCE_-23097586_1, clientMachine=10.0.0.17
at
org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager.getDatanodeStorageInfos(DatanodeManager.java:514)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalDatanode(FSNamesystem.java:3353)
at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getAdditionalDatanode(NameNodeRpcServer.java:759)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getAdditionalDatanode(ClientNamenodeProtocolServerSideTranslatorPB.java:515)
at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)

at org.apache.hadoop.ipc.Client.call(Client.java:1347)
at org.apache.hadoop.ipc.Client.call(Client.java:1300)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
at com.sun.proxy.$Proxy10.getAdditionalDatanode(Unknown Source)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy10.getAdditionalDatanode(Unknown Source)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getAdditionalDatanode(ClientNamenodeProtocolTranslatorPB.java:352)
at
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:919)
at
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1031)
at
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:823)
at
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:475)
polygraph-engine 2017-06-20 22:38:11,302 WARN JobGenerator
ReceivedBlockTracker.logWarning - Failed to acknowledge batch clean up in
the Write Ahead Log.

Thanks
N B


On Tue, Jun 20, 2017 at 10:24 AM, N B <nb...@gmail.com> wrote:

> BTW, this is running on Spark 2.1.1.
>
> I have been trying to debug this issue and what I have found till now is
> that it is somehow related to the Spark WAL. The directory named
> <checkpoint dir in HDFS>/receivedBlockMetadata seems to stop getting
> written to after the point of an HDFS node being killed and restarted. I
> have a couple questions around this :
>
> 1. Why is the flume receiver even writing to the WAL? I have not enabled
> the 'spark.streaming.receiver.writeAheadLog.enable' property and even
> after I set it explicitly to false in the driver, the WAL seems to be
> getting written to.
>
> 2. Why would the receive receive metadata but not write to the WAL after
> an HDFS node is lost and restarted? HDFS replication factor is at its
> default of 2.
>
> Thanks
> N B
>
>
> On Mon, Jun 19, 2017 at 6:23 PM, N B <nb...@gmail.com> wrote:
>
>> Hi all,
>>
>> We are running a Standalone Spark Cluster for running a streaming
>> application. The application consumes data from Flume using a Flume Polling
>> stream created as such :
>>
>> flumeStream = FlumeUtils.createPollingStream(streamingContext,
>>     socketAddress.toArray(new InetSocketAddress[socketAddress.size()]),
>>     StorageLevel.MEMORY_AND_DISK_SER(), *100*, *5*);
>>
>>
>> The checkpoint directory is configured to be on an HDFS cluster and Spark
>> workers have their SPARK_LOCAL_DIRS and SPARK_WORKER_DIR defined to be on
>> their respective local filesystems.
>>
>> What we are seeing is some odd behavior and unable to explain. During
>> normal operation, everything runs as expected with flume delivering events
>> to Spark. However, while running, if I kill one of the HDFS nodes (does not
>> matter which one), the Flume Receiver in Spark stops producing any data to
>> the data processing.
>>
>> I enabled debug logging for org.apache.spark.streaming.flume on Spark
>> worker nodes and looked at the logs for the one that gets to run the Flume
>> Receiver and it keeps chugging along receiving data from Flume as shown in
>> a sample of the log below, but the resulting batches in the Stream start
>> receiving 0 records soon as the HDFS node is killed, with no errors being
>> produced to indicate any issue.
>>
>> *17/06/20 01:05:42 DEBUG FlumeBatchFetcher: Ack sent for sequence number:
>> 09fa05f59050*
>> *17/06/20 01:05:44 DEBUG FlumeBatchFetcher: Received batch of 100 events
>> with sequence number: 09fa05f59052*
>> *17/06/20 01:05:44 DEBUG FlumeBatchFetcher: Sending ack for sequence
>> number: 09fa05f59052*
>> *17/06/20 01:05:44 DEBUG FlumeBatchFetcher: Ack sent for sequence number:
>> 09fa05f59052*
>> *17/06/20 01:05:47 DEBUG FlumeBatchFetcher: Received batch of 100 events
>> with sequence number: 09fa05f59054*
>> *17/06/20 01:05:47 DEBUG FlumeBatchFetcher: Sending ack for sequence
>> number: 09fa05f59054*
>>
>> The driver output for the application shows (printed via
>> Dstream.count().map().print()):
>>
>> -------------------------------------------
>> Time: 1497920770000 ms
>> -------------------------------------------
>> Received 0     flume events.
>>
>>
>> Any insights about where to look in order to find the root cause will be
>> greatly appreciated.
>>
>> Thanks
>> N B
>>
>>
>

Re: Flume DStream produces 0 records after HDFS node killed

Posted by N B <nb...@gmail.com>.
BTW, this is running on Spark 2.1.1.

I have been trying to debug this issue and what I have found till now is
that it is somehow related to the Spark WAL. The directory named
<checkpoint dir in HDFS>/receivedBlockMetadata seems to stop getting
written to after the point of an HDFS node being killed and restarted. I
have a couple questions around this :

1. Why is the flume receiver even writing to the WAL? I have not enabled
the 'spark.streaming.receiver.writeAheadLog.enable' property and even after
I set it explicitly to false in the driver, the WAL seems to be getting
written to.

2. Why would the receive receive metadata but not write to the WAL after an
HDFS node is lost and restarted? HDFS replication factor is at its default
of 2.

Thanks
N B


On Mon, Jun 19, 2017 at 6:23 PM, N B <nb...@gmail.com> wrote:

> Hi all,
>
> We are running a Standalone Spark Cluster for running a streaming
> application. The application consumes data from Flume using a Flume Polling
> stream created as such :
>
> flumeStream = FlumeUtils.createPollingStream(streamingContext,
>     socketAddress.toArray(new InetSocketAddress[socketAddress.size()]),
>     StorageLevel.MEMORY_AND_DISK_SER(), *100*, *5*);
>
>
> The checkpoint directory is configured to be on an HDFS cluster and Spark
> workers have their SPARK_LOCAL_DIRS and SPARK_WORKER_DIR defined to be on
> their respective local filesystems.
>
> What we are seeing is some odd behavior and unable to explain. During
> normal operation, everything runs as expected with flume delivering events
> to Spark. However, while running, if I kill one of the HDFS nodes (does not
> matter which one), the Flume Receiver in Spark stops producing any data to
> the data processing.
>
> I enabled debug logging for org.apache.spark.streaming.flume on Spark
> worker nodes and looked at the logs for the one that gets to run the Flume
> Receiver and it keeps chugging along receiving data from Flume as shown in
> a sample of the log below, but the resulting batches in the Stream start
> receiving 0 records soon as the HDFS node is killed, with no errors being
> produced to indicate any issue.
>
> *17/06/20 01:05:42 DEBUG FlumeBatchFetcher: Ack sent for sequence number:
> 09fa05f59050*
> *17/06/20 01:05:44 DEBUG FlumeBatchFetcher: Received batch of 100 events
> with sequence number: 09fa05f59052*
> *17/06/20 01:05:44 DEBUG FlumeBatchFetcher: Sending ack for sequence
> number: 09fa05f59052*
> *17/06/20 01:05:44 DEBUG FlumeBatchFetcher: Ack sent for sequence number:
> 09fa05f59052*
> *17/06/20 01:05:47 DEBUG FlumeBatchFetcher: Received batch of 100 events
> with sequence number: 09fa05f59054*
> *17/06/20 01:05:47 DEBUG FlumeBatchFetcher: Sending ack for sequence
> number: 09fa05f59054*
>
> The driver output for the application shows (printed via
> Dstream.count().map().print()):
>
> -------------------------------------------
> Time: 1497920770000 ms
> -------------------------------------------
> Received 0     flume events.
>
>
> Any insights about where to look in order to find the root cause will be
> greatly appreciated.
>
> Thanks
> N B
>
>