You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Mu Kong <ko...@gmail.com> on 2017/06/12 01:01:00 UTC

[QUESTION] OutOfMemoryError when writing into HDFS

Hi all,

Thanks a lot for your work for the community!

This question is more like a discuss.
Currently, I'm experiencing an *OutOfMemoryError* when writing files
from Kafka into HDFS using *BucketingSink*.

The log looks like this:

2017-06-10 08:58:42,248 INFO
org.apache.flink.runtime.taskmanager.TaskManager              -
Garbage collector stats: [G1 Young Generation, GC TIME (ms): 33283, GC
COUNT: 977], [G1 Old Generation, GC TIME (ms): 325, GC COUNT: 2]
2017-06-10 08:59:42,248 INFO
org.apache.flink.runtime.taskmanager.TaskManager              - Memory
usage stats: [HEAP: 14080/20480/20480 MB, NON HEAP: 81/83/-1 MB
(used/committed/max)]
2017-06-10 08:59:42,248 INFO
org.apache.flink.runtime.taskmanager.TaskManager              - Direct
memory stats: Count: 16846, Total Capacity: 443738663, Used Memory:
443738664
2017-06-10 08:59:42,248 INFO
org.apache.flink.runtime.taskmanager.TaskManager              -
Off-heap pool stats: [Code Cache: 24/25/240 MB (used/committed/max)],
[Metaspace: 50/51/-1 MB (used/committed/max)], [Compressed Class
Space: 6/6/1024 MB (used/committed/max)]
2017-06-10 08:59:42,248 INFO
org.apache.flink.runtime.taskmanager.TaskManager              -
Garbage collector stats: [G1 Young Generation, GC TIME (ms): 33311, GC
COUNT: 978], [G1 Old Generation, GC TIME (ms): 325, GC COUNT: 2]
2017-06-10 09:00:20,485 INFO
org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkImpl
 - backgroundOperationsLoop exiting
2017-06-10 09:00:20,488 INFO  org.apache.zookeeper.ZooKeeper
                     - Session: 0x55591b10666ea92 closed
2017-06-10 09:00:20,488 INFO  org.apache.zookeeper.ClientCnxn
                     - EventThread shut down
2017-06-10 09:00:34,999 INFO
org.apache.flink.shaded.org.apache.curator.framework.imps.CuratorFrameworkImpl
 - backgroundOperationsLoop exiting
2017-06-10 09:00:35,001 INFO  org.apache.zookeeper.ZooKeeper
                     - Session: 0x55591b10666ea94 closed
2017-06-10 09:00:35,001 INFO  org.apache.zookeeper.ClientCnxn
                     - EventThread shut down
2017-06-10 09:00:42,248 INFO
org.apache.flink.runtime.taskmanager.TaskManager              - Memory
usage stats: [HEAP: 12820/20480/20480 MB, NON HEAP: 81/83/-1 MB
(used/committed/max)]
2017-06-10 09:00:42,248 INFO
org.apache.flink.runtime.taskmanager.TaskManager              - Direct
memory stats: Count: 17438, Total Capacity: 458405794, Used Memory:
458405795
2017-06-10 09:00:42,248 INFO
org.apache.flink.runtime.taskmanager.TaskManager              -
Off-heap pool stats: [Code Cache: 25/25/240 MB (used/committed/max)],
[Metaspace: 50/51/-1 MB (used/committed/max)], [Compressed Class
Space: 6/6/1024 MB (used/committed/max)]
2017-06-10 09:00:42,248 INFO
org.apache.flink.runtime.taskmanager.TaskManager              -
Garbage collector stats: [G1 Young Generation, GC TIME (ms): 33339, GC
COUNT: 979], [G1 Old Generation, GC TIME (ms): 325, GC COUNT: 2]
2017-06-10 09:01:04,962 INFO
org.apache.flink.runtime.taskmanager.Task                     -
Source: Custom Source -> Sink: Unnamed (7/22)
(57d3c79ae13fd06de79ca6cb8f1431b4) switched from RUNNING to FAILED.
java.lang.OutOfMemoryError: unable to create new native thread
    at java.lang.Thread.start0(Native Method)
    at java.lang.Thread.start(Thread.java:714)
    at org.apache.hadoop.hdfs.DFSOutputStream.start(DFSOutputStream.java:2170)
    at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1685)
    at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1689)
    at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1624)
    at org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:448)
    at org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:444)
    at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:459)
    at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:387)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:909)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:890)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:787)
    at org.apache.flink.streaming.connectors.fs.StreamWriterBase.open(StreamWriterBase.java:126)
    at org.apache.flink.streaming.connectors.fs.StringWriter.open(StringWriter.java:62)
    at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:546)
    at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:441)
    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:41)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103)
    at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:228)
    at org.apache.flink.streaming.connectors.kafka.internals.SimpleConsumerThread.run(SimpleConsumerThread.java:385)
2017-06-10 09:01:04,982 INFO
org.apache.flink.runtime.taskmanager.Task                     -
Freeing task resources for Source: Custom Source -> Sink: Unnamed
(7/22) (57d3c79ae13fd06de79ca6cb8f1431b4).
2017-06-10 09:01:04,989 INFO
org.apache.flink.runtime.taskmanager.Task                     -
Ensuring all FileSystem streams are closed for task Source: Custom
Source -> Sink: Unnamed (7/22) (57d3c79ae13fd06de79ca6cb8f1431b4)
[FAILED]
2017-06-10 09:01:04,989 INFO
org.apache.flink.runtime.taskmanager.TaskManager              -
Un-registering task and sending final execution state FAILED to
JobManager for task Source: Custom Source -> Sink: Unnamed
(57d3c79ae13fd06de79ca6cb8f1431b4)
2017-06-10 09:01:05,025 INFO
org.apache.flink.runtime.taskmanager.Task                     -
Attempting to cancel task Source: Custom Source -> Sink: Unnamed
(1/22) (f64b613bcb366952d716d57913e01acf).
2017-06-10 09:01:05,025 INFO
org.apache.flink.runtime.taskmanager.Task                     -
Source: Custom Source -> Sink: Unnamed (1/22)
(f64b613bcb366952d716d57913e01acf) switched from RUNNING to CANCELING.
2017-06-10 09:01:05,025 INFO
org.apache.flink.runtime.taskmanager.Task                     -
Triggering cancellation of task code Source: Custom Source -> Sink:
Unnamed (1/22) (f64b613bcb366952d716d57913e01acf).
2017-06-10 09:01:05,033 INFO
org.apache.flink.runtime.taskmanager.Task                     -
Attempting to cancel task Source: Custom Source -> Sink: Unnamed
(4/22) (956689ad000ce02f128dc3147641736c).
2017-06-10 09:01:05,033 INFO  org.apache.flink.runtime.taskmanager.Task


As the memory monitoring suggests, there is still plenty of free memory in
the heap.
So I'm not sure whether this should be an OutOfmemoryError.

I was using fs.hdfs.hadoopconf to setup my HDFS client, is there any
possibility that this error is caused by HDFS client's side?

If so, maybe we should change the error message a little bit?

Re: [QUESTION] OutOfMemoryError when writing into HDFS

Posted by Mu Kong <ko...@gmail.com>.
Hi Ted,

Thanks for the useful link there.

Actually, I just found out that the error might be triggered by a kerberos
issue in our system.
I'll use the ticket cache and execute kinit in crontab to see if it is the
case.

But thanks for the link you provided. I'll definitely check it :)

Best regards,
Mu

On Mon, Jun 12, 2017 at 10:45 AM, Ted Yu <yu...@gmail.com> wrote:

> Can you see if the following post helps in troubleshooting ?
>
> https://blog.fastthread.io/2016/07/06/troubleshoot-
> outofmemoryerror-unable-to-create-new-native-thread/
>
> Thanks
>
> On Sun, Jun 11, 2017 at 6:01 PM, Mu Kong <ko...@gmail.com> wrote:
>
> > Hi all,
> >
> > Thanks a lot for your work for the community!
> >
> > This question is more like a discuss.
> > Currently, I'm experiencing an *OutOfMemoryError* when writing files
> > from Kafka into HDFS using *BucketingSink*.
> >
> > The log looks like this:
> >
> > 2017-06-10 08:58:42,248 INFO
> > org.apache.flink.runtime.taskmanager.TaskManager              -
> > Garbage collector stats: [G1 Young Generation, GC TIME (ms): 33283, GC
> > COUNT: 977], [G1 Old Generation, GC TIME (ms): 325, GC COUNT: 2]
> > 2017-06-10 08:59:42,248 INFO
> > org.apache.flink.runtime.taskmanager.TaskManager              - Memory
> > usage stats: [HEAP: 14080/20480/20480 MB, NON HEAP: 81/83/-1 MB
> > (used/committed/max)]
> > 2017-06-10 08:59:42,248 INFO
> > org.apache.flink.runtime.taskmanager.TaskManager              - Direct
> > memory stats: Count: 16846, Total Capacity: 443738663, Used Memory:
> > 443738664
> > 2017-06-10 08:59:42,248 INFO
> > org.apache.flink.runtime.taskmanager.TaskManager              -
> > Off-heap pool stats: [Code Cache: 24/25/240 MB (used/committed/max)],
> > [Metaspace: 50/51/-1 MB (used/committed/max)], [Compressed Class
> > Space: 6/6/1024 MB (used/committed/max)]
> > 2017-06-10 08:59:42,248 INFO
> > org.apache.flink.runtime.taskmanager.TaskManager              -
> > Garbage collector stats: [G1 Young Generation, GC TIME (ms): 33311, GC
> > COUNT: 978], [G1 Old Generation, GC TIME (ms): 325, GC COUNT: 2]
> > 2017-06-10 09:00:20,485 INFO
> > org.apache.flink.shaded.org.apache.curator.framework.imps.
> > CuratorFrameworkImpl
> >  - backgroundOperationsLoop exiting
> > 2017-06-10 09:00:20,488 INFO  org.apache.zookeeper.ZooKeeper
> >                      - Session: 0x55591b10666ea92 closed
> > 2017-06-10 09:00:20,488 INFO  org.apache.zookeeper.ClientCnxn
> >                      - EventThread shut down
> > 2017-06-10 09:00:34,999 INFO
> > org.apache.flink.shaded.org.apache.curator.framework.imps.
> > CuratorFrameworkImpl
> >  - backgroundOperationsLoop exiting
> > 2017-06-10 09:00:35,001 INFO  org.apache.zookeeper.ZooKeeper
> >                      - Session: 0x55591b10666ea94 closed
> > 2017-06-10 09:00:35,001 INFO  org.apache.zookeeper.ClientCnxn
> >                      - EventThread shut down
> > 2017-06-10 09:00:42,248 INFO
> > org.apache.flink.runtime.taskmanager.TaskManager              - Memory
> > usage stats: [HEAP: 12820/20480/20480 MB, NON HEAP: 81/83/-1 MB
> > (used/committed/max)]
> > 2017-06-10 09:00:42,248 INFO
> > org.apache.flink.runtime.taskmanager.TaskManager              - Direct
> > memory stats: Count: 17438, Total Capacity: 458405794, Used Memory:
> > 458405795
> > 2017-06-10 09:00:42,248 INFO
> > org.apache.flink.runtime.taskmanager.TaskManager              -
> > Off-heap pool stats: [Code Cache: 25/25/240 MB (used/committed/max)],
> > [Metaspace: 50/51/-1 MB (used/committed/max)], [Compressed Class
> > Space: 6/6/1024 MB (used/committed/max)]
> > 2017-06-10 09:00:42,248 INFO
> > org.apache.flink.runtime.taskmanager.TaskManager              -
> > Garbage collector stats: [G1 Young Generation, GC TIME (ms): 33339, GC
> > COUNT: 979], [G1 Old Generation, GC TIME (ms): 325, GC COUNT: 2]
> > 2017-06-10 09:01:04,962 INFO
> > org.apache.flink.runtime.taskmanager.Task                     -
> > Source: Custom Source -> Sink: Unnamed (7/22)
> > (57d3c79ae13fd06de79ca6cb8f1431b4) switched from RUNNING to FAILED.
> > java.lang.OutOfMemoryError: unable to create new native thread
> >     at java.lang.Thread.start0(Native Method)
> >     at java.lang.Thread.start(Thread.java:714)
> >     at org.apache.hadoop.hdfs.DFSOutputStream.start(
> > DFSOutputStream.java:2170)
> >     at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(
> > DFSOutputStream.java:1685)
> >     at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1689)
> >     at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1624)
> >     at org.apache.hadoop.hdfs.DistributedFileSystem$7.
> > doCall(DistributedFileSystem.java:448)
> >     at org.apache.hadoop.hdfs.DistributedFileSystem$7.
> > doCall(DistributedFileSystem.java:444)
> >     at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(
> > FileSystemLinkResolver.java:81)
> >     at org.apache.hadoop.hdfs.DistributedFileSystem.create(
> > DistributedFileSystem.java:459)
> >     at org.apache.hadoop.hdfs.DistributedFileSystem.create(
> > DistributedFileSystem.java:387)
> >     at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:909)
> >     at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:890)
> >     at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:787)
> >     at org.apache.flink.streaming.connectors.fs.StreamWriterBase.open(
> > StreamWriterBase.java:126)
> >     at org.apache.flink.streaming.connectors.fs.StringWriter.
> > open(StringWriter.java:62)
> >     at org.apache.flink.streaming.connectors.fs.bucketing.
> > BucketingSink.openNewPartFile(BucketingSink.java:546)
> >     at org.apache.flink.streaming.connectors.fs.bucketing.
> > BucketingSink.invoke(BucketingSink.java:441)
> >     at org.apache.flink.streaming.api.operators.StreamSink.
> > processElement(StreamSink.java:41)
> >     at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> > CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
> >     at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> > CopyingChainingOutput.collect(OperatorChain.java:503)
> >     at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> > CopyingChainingOutput.collect(OperatorChain.java:483)
> >     at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> > CountingOutput.collect(AbstractStreamOperator.java:891)
> >     at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> > CountingOutput.collect(AbstractStreamOperator.java:869)
> >     at org.apache.flink.streaming.api.operators.StreamSourceContexts$
> > NonTimestampContext.collect(StreamSourceContexts.java:103)
> >     at org.apache.flink.streaming.connectors.kafka.internals.
> > AbstractFetcher.emitRecord(AbstractFetcher.java:228)
> >     at org.apache.flink.streaming.connectors.kafka.internals.
> > SimpleConsumerThread.run(SimpleConsumerThread.java:385)
> > 2017-06-10 09:01:04,982 INFO
> > org.apache.flink.runtime.taskmanager.Task                     -
> > Freeing task resources for Source: Custom Source -> Sink: Unnamed
> > (7/22) (57d3c79ae13fd06de79ca6cb8f1431b4).
> > 2017-06-10 09:01:04,989 INFO
> > org.apache.flink.runtime.taskmanager.Task                     -
> > Ensuring all FileSystem streams are closed for task Source: Custom
> > Source -> Sink: Unnamed (7/22) (57d3c79ae13fd06de79ca6cb8f1431b4)
> > [FAILED]
> > 2017-06-10 09:01:04,989 INFO
> > org.apache.flink.runtime.taskmanager.TaskManager              -
> > Un-registering task and sending final execution state FAILED to
> > JobManager for task Source: Custom Source -> Sink: Unnamed
> > (57d3c79ae13fd06de79ca6cb8f1431b4)
> > 2017-06-10 09:01:05,025 INFO
> > org.apache.flink.runtime.taskmanager.Task                     -
> > Attempting to cancel task Source: Custom Source -> Sink: Unnamed
> > (1/22) (f64b613bcb366952d716d57913e01acf).
> > 2017-06-10 09:01:05,025 INFO
> > org.apache.flink.runtime.taskmanager.Task                     -
> > Source: Custom Source -> Sink: Unnamed (1/22)
> > (f64b613bcb366952d716d57913e01acf) switched from RUNNING to CANCELING.
> > 2017-06-10 09:01:05,025 INFO
> > org.apache.flink.runtime.taskmanager.Task                     -
> > Triggering cancellation of task code Source: Custom Source -> Sink:
> > Unnamed (1/22) (f64b613bcb366952d716d57913e01acf).
> > 2017-06-10 09:01:05,033 INFO
> > org.apache.flink.runtime.taskmanager.Task                     -
> > Attempting to cancel task Source: Custom Source -> Sink: Unnamed
> > (4/22) (956689ad000ce02f128dc3147641736c).
> > 2017-06-10 09:01:05,033 INFO  org.apache.flink.runtime.taskmanager.Task
> >
> >
> > As the memory monitoring suggests, there is still plenty of free memory
> in
> > the heap.
> > So I'm not sure whether this should be an OutOfmemoryError.
> >
> > I was using fs.hdfs.hadoopconf to setup my HDFS client, is there any
> > possibility that this error is caused by HDFS client's side?
> >
> > If so, maybe we should change the error message a little bit?
> >
>

Re: [QUESTION] OutOfMemoryError when writing into HDFS

Posted by Ted Yu <yu...@gmail.com>.
Can you see if the following post helps in troubleshooting ?

https://blog.fastthread.io/2016/07/06/troubleshoot-outofmemoryerror-unable-to-create-new-native-thread/

Thanks

On Sun, Jun 11, 2017 at 6:01 PM, Mu Kong <ko...@gmail.com> wrote:

> Hi all,
>
> Thanks a lot for your work for the community!
>
> This question is more like a discuss.
> Currently, I'm experiencing an *OutOfMemoryError* when writing files
> from Kafka into HDFS using *BucketingSink*.
>
> The log looks like this:
>
> 2017-06-10 08:58:42,248 INFO
> org.apache.flink.runtime.taskmanager.TaskManager              -
> Garbage collector stats: [G1 Young Generation, GC TIME (ms): 33283, GC
> COUNT: 977], [G1 Old Generation, GC TIME (ms): 325, GC COUNT: 2]
> 2017-06-10 08:59:42,248 INFO
> org.apache.flink.runtime.taskmanager.TaskManager              - Memory
> usage stats: [HEAP: 14080/20480/20480 MB, NON HEAP: 81/83/-1 MB
> (used/committed/max)]
> 2017-06-10 08:59:42,248 INFO
> org.apache.flink.runtime.taskmanager.TaskManager              - Direct
> memory stats: Count: 16846, Total Capacity: 443738663, Used Memory:
> 443738664
> 2017-06-10 08:59:42,248 INFO
> org.apache.flink.runtime.taskmanager.TaskManager              -
> Off-heap pool stats: [Code Cache: 24/25/240 MB (used/committed/max)],
> [Metaspace: 50/51/-1 MB (used/committed/max)], [Compressed Class
> Space: 6/6/1024 MB (used/committed/max)]
> 2017-06-10 08:59:42,248 INFO
> org.apache.flink.runtime.taskmanager.TaskManager              -
> Garbage collector stats: [G1 Young Generation, GC TIME (ms): 33311, GC
> COUNT: 978], [G1 Old Generation, GC TIME (ms): 325, GC COUNT: 2]
> 2017-06-10 09:00:20,485 INFO
> org.apache.flink.shaded.org.apache.curator.framework.imps.
> CuratorFrameworkImpl
>  - backgroundOperationsLoop exiting
> 2017-06-10 09:00:20,488 INFO  org.apache.zookeeper.ZooKeeper
>                      - Session: 0x55591b10666ea92 closed
> 2017-06-10 09:00:20,488 INFO  org.apache.zookeeper.ClientCnxn
>                      - EventThread shut down
> 2017-06-10 09:00:34,999 INFO
> org.apache.flink.shaded.org.apache.curator.framework.imps.
> CuratorFrameworkImpl
>  - backgroundOperationsLoop exiting
> 2017-06-10 09:00:35,001 INFO  org.apache.zookeeper.ZooKeeper
>                      - Session: 0x55591b10666ea94 closed
> 2017-06-10 09:00:35,001 INFO  org.apache.zookeeper.ClientCnxn
>                      - EventThread shut down
> 2017-06-10 09:00:42,248 INFO
> org.apache.flink.runtime.taskmanager.TaskManager              - Memory
> usage stats: [HEAP: 12820/20480/20480 MB, NON HEAP: 81/83/-1 MB
> (used/committed/max)]
> 2017-06-10 09:00:42,248 INFO
> org.apache.flink.runtime.taskmanager.TaskManager              - Direct
> memory stats: Count: 17438, Total Capacity: 458405794, Used Memory:
> 458405795
> 2017-06-10 09:00:42,248 INFO
> org.apache.flink.runtime.taskmanager.TaskManager              -
> Off-heap pool stats: [Code Cache: 25/25/240 MB (used/committed/max)],
> [Metaspace: 50/51/-1 MB (used/committed/max)], [Compressed Class
> Space: 6/6/1024 MB (used/committed/max)]
> 2017-06-10 09:00:42,248 INFO
> org.apache.flink.runtime.taskmanager.TaskManager              -
> Garbage collector stats: [G1 Young Generation, GC TIME (ms): 33339, GC
> COUNT: 979], [G1 Old Generation, GC TIME (ms): 325, GC COUNT: 2]
> 2017-06-10 09:01:04,962 INFO
> org.apache.flink.runtime.taskmanager.Task                     -
> Source: Custom Source -> Sink: Unnamed (7/22)
> (57d3c79ae13fd06de79ca6cb8f1431b4) switched from RUNNING to FAILED.
> java.lang.OutOfMemoryError: unable to create new native thread
>     at java.lang.Thread.start0(Native Method)
>     at java.lang.Thread.start(Thread.java:714)
>     at org.apache.hadoop.hdfs.DFSOutputStream.start(
> DFSOutputStream.java:2170)
>     at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(
> DFSOutputStream.java:1685)
>     at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1689)
>     at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1624)
>     at org.apache.hadoop.hdfs.DistributedFileSystem$7.
> doCall(DistributedFileSystem.java:448)
>     at org.apache.hadoop.hdfs.DistributedFileSystem$7.
> doCall(DistributedFileSystem.java:444)
>     at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(
> FileSystemLinkResolver.java:81)
>     at org.apache.hadoop.hdfs.DistributedFileSystem.create(
> DistributedFileSystem.java:459)
>     at org.apache.hadoop.hdfs.DistributedFileSystem.create(
> DistributedFileSystem.java:387)
>     at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:909)
>     at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:890)
>     at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:787)
>     at org.apache.flink.streaming.connectors.fs.StreamWriterBase.open(
> StreamWriterBase.java:126)
>     at org.apache.flink.streaming.connectors.fs.StringWriter.
> open(StringWriter.java:62)
>     at org.apache.flink.streaming.connectors.fs.bucketing.
> BucketingSink.openNewPartFile(BucketingSink.java:546)
>     at org.apache.flink.streaming.connectors.fs.bucketing.
> BucketingSink.invoke(BucketingSink.java:441)
>     at org.apache.flink.streaming.api.operators.StreamSink.
> processElement(StreamSink.java:41)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:503)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:483)
>     at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:891)
>     at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:869)
>     at org.apache.flink.streaming.api.operators.StreamSourceContexts$
> NonTimestampContext.collect(StreamSourceContexts.java:103)
>     at org.apache.flink.streaming.connectors.kafka.internals.
> AbstractFetcher.emitRecord(AbstractFetcher.java:228)
>     at org.apache.flink.streaming.connectors.kafka.internals.
> SimpleConsumerThread.run(SimpleConsumerThread.java:385)
> 2017-06-10 09:01:04,982 INFO
> org.apache.flink.runtime.taskmanager.Task                     -
> Freeing task resources for Source: Custom Source -> Sink: Unnamed
> (7/22) (57d3c79ae13fd06de79ca6cb8f1431b4).
> 2017-06-10 09:01:04,989 INFO
> org.apache.flink.runtime.taskmanager.Task                     -
> Ensuring all FileSystem streams are closed for task Source: Custom
> Source -> Sink: Unnamed (7/22) (57d3c79ae13fd06de79ca6cb8f1431b4)
> [FAILED]
> 2017-06-10 09:01:04,989 INFO
> org.apache.flink.runtime.taskmanager.TaskManager              -
> Un-registering task and sending final execution state FAILED to
> JobManager for task Source: Custom Source -> Sink: Unnamed
> (57d3c79ae13fd06de79ca6cb8f1431b4)
> 2017-06-10 09:01:05,025 INFO
> org.apache.flink.runtime.taskmanager.Task                     -
> Attempting to cancel task Source: Custom Source -> Sink: Unnamed
> (1/22) (f64b613bcb366952d716d57913e01acf).
> 2017-06-10 09:01:05,025 INFO
> org.apache.flink.runtime.taskmanager.Task                     -
> Source: Custom Source -> Sink: Unnamed (1/22)
> (f64b613bcb366952d716d57913e01acf) switched from RUNNING to CANCELING.
> 2017-06-10 09:01:05,025 INFO
> org.apache.flink.runtime.taskmanager.Task                     -
> Triggering cancellation of task code Source: Custom Source -> Sink:
> Unnamed (1/22) (f64b613bcb366952d716d57913e01acf).
> 2017-06-10 09:01:05,033 INFO
> org.apache.flink.runtime.taskmanager.Task                     -
> Attempting to cancel task Source: Custom Source -> Sink: Unnamed
> (4/22) (956689ad000ce02f128dc3147641736c).
> 2017-06-10 09:01:05,033 INFO  org.apache.flink.runtime.taskmanager.Task
>
>
> As the memory monitoring suggests, there is still plenty of free memory in
> the heap.
> So I'm not sure whether this should be an OutOfmemoryError.
>
> I was using fs.hdfs.hadoopconf to setup my HDFS client, is there any
> possibility that this error is caused by HDFS client's side?
>
> If so, maybe we should change the error message a little bit?
>