You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Biswajit Das <bi...@gmail.com> on 2017/08/23 19:05:29 UTC

custom writer fail to recover

Hi There ,

I'm using custom writer with hourly Rolling Bucket sink . I'm seeing two
issue

first one if write the same file on s3 all the files
gets committed , however when I write the same on HDFS I see its remains on
.pending state , could be related to second problem below

Second issue : My custom writer is writing Avro to parquet and writer is
something like this extended from BaseStreamWriter


  @transient private var writer: ParquetWriter[T] = _

  override def open(fs: FileSystem, path: Path): Unit = {
    val conf = new Configuration()
    conf.setBoolean(ADD_LIST_ELEMENT_RECORDS, false)
    conf.setBoolean(WRITE_OLD_LIST_STRUCTURE, false)
    writer = AvroParquetWriter
      .builder[T](path)
      .withSchema(new Schema.Parser().parse(schema))
      .withCompressionCodec(compressionCodecName)
      .withConf(conf)
      .build()
  }

  override def write(element: T): Unit = writer.write(element)

  override def duplicate(): Writer[T] = new AvroParquetSinkWriter[T](schema)

  override def close(): Unit = writer.close()

  override def getPos: Long = writer.getDataSize

  override def flush(): Long = super.flush()


What I noticed during recovering from checkpoint it fails to flush ,
although I have overriden flush ^^ above . The issue seems
it doesn't have handle of stream writer that's why it is failing when flush
call for stream writer , not sure if first .pedning
state is related to this also .


--------------------------------------------------
11:52:04.082 [pool-13-thread-1] INFO  o.a.flink.runtime.taskmanager.Task -
Source: eo_open- kafka source (1/1) (d926613dfcb5ac993a362e9b985e40d6)
switched from RUNNING to FAILED.
org.apache.flink.streaming.runtime.tasks.AsynchronousException:
java.lang.Exception: Could not materialize checkpoint 4 for operator
Source:- kafka source (1/1).
    at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
~[flink-streaming-java_2.10-1.3.2.jar:1.3.2]
    at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[na:1.8.0_73]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[na:1.8.0_73]
    at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
[na:1.8.0_73]
    at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
[na:1.8.0_73]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_73]
Caused by: java.lang.Exception: Could not materialize checkpoint 4 for
operator Source: eo_open- kafka source (1/1).
    ... 6 common frames omitted
Caused by: java.util.concurrent.ExecutionException: java.io.IOException:
Could not flush and close the file system output stream to
hdfs://XXXX:8020/checkpoint/data/das/2aecebbb9d46fb2231edb4b5bd00f6fc/chk-4/e33e1f23-c834-4c96-8708-22e648114d6c
in order to obtain the stream state handle
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
[na:1.8.0_73]
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
[na:1.8.0_73]
    at
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
~[flink-core-1.3.2.jar:1.3.2]
    at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:906)
~[flink-streaming-java_2.10-1.3.2.jar:1.3.2]
    ... 5 common frames omitted
    Suppressed: java.lang.Exception: Could not properly cancel managed
operator state future.
        at
org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:98)
~[flink-streaming-java_2.10-1.3.2.jar:1.3.2]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
~[flink-streaming-java_2.10-1.3.2.jar:1.3.2]
        at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961)
~[flink-streaming-java_2.10-1.3.2.jar:1.3.2]
        ... 5 common frames omitted
    Caused by: java.util.concurrent.ExecutionException:
java.io.IOException: Could not flush and close the file system output
stream to
hdfs://xxx:8020/checkpoint/data/das/2aecebbb9d46fb2231edb4b5bd00f6fc/chk-4/e33e1f23-c834-4c96-8708-22e648114d6c
in order to obtain the stream state handle
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:192)
        at
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
        at
org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
        at
org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:96)
        ... 7 common frames omitted
    Caused by: java.io.IOException: Could not flush and close the file
system output stream to
hdfs://XXX:8020/checkpoint/data/das/2aecebbb9d46fb2231edb4b5bd00f6fc/chk-4/e33e1f23-c834-4c96-8708-22e648114d6c
in order to obtain the stream state handle
        at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336)
        at
org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable.closeStreamAndGetStateHandle(AbstractAsyncSnapshotIOCallable.java:100)
        at
org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:270)
        at
org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:233)
        at
org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at
org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:288)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:392)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1162)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1094)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:654)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:590)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:521)
        at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.triggerCheckpoint(SourceStreamTask.java:112)
        at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1185)
        ... 5 common frames omitted
    Caused by: org.apache.hadoop.net.ConnectTimeoutException: 60000 millis
timeout while waiting for channel to be ready for connect. ch :
java.nio.channels.SocketChannel[connection-pending remote=/XXXX:50010]
        at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:532)
        at
org.apache.hadoop.hdfs.DFSOutputStream.createSocketForPipeline(DFSOutputStream.java:1341)


        -------------------------------------

Re: custom writer fail to recover

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

Which version of Flink is this? It cannot recover because it expects more data to have been written than is there, which seems to indicate that flushing did not work correctly.

Best,
Aljoscha

> On 19. Dec 2017, at 00:40, xiatao123 <ta...@udacity.com> wrote:
> 
> Hi Das,
>  Have you got your .pending issue resolved? I am running into the same
> issue where the parquet files are all in pending status.
>  Please help to share your solutions.
> Thanks,
> Tao
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: custom writer fail to recover

Posted by xiatao123 <ta...@udacity.com>.
Hi Das,
  Have you got your .pending issue resolved? I am running into the same
issue where the parquet files are all in pending status.
  Please help to share your solutions.
Thanks,
Tao



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: custom writer fail to recover

Posted by Biswajit Das <bi...@gmail.com>.
Hi Stefan ,

My bad , I'm really sorry. I have copied wrong exception stack , during the
recovery after error I'm seeing below exception


Caused by:
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.HadoopIllegalArgumentException):
Cannot truncate to a larger file size. Current size: 31132385, truncate
size: 35985787.
    at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncateInternal(FSNamesystem.java:2089)
    at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncateInt(FSNamesystem.java:2027)
    at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncate(FSNamesystem.java:1997)
    at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.truncate(NameNodeRpcServer.java:930)
    at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.truncate(ClientNamenodeProtocolServerSideTranslatorPB.java:599)
    at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
    at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)

    at org.apache.hadoop.ipc.Client.call(Client.java:1475)
    at org.apache.hadoop.ipc.Client.call(Client.java:1412)
    at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
    at com.sun.proxy.$Proxy12.truncate(Unknown Source)
    at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.truncate(ClientNamenodeProtocolTranslatorPB.java:313)
    at sun.reflect.GeneratedMethodAccessor309.invoke(Unknown Source)
    at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
    at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
    at com.sun.proxy.$Proxy13.truncate(Unknown Source)
    at org.apache.hadoop.hdfs.DFSClient.truncate(DFSClient.java:2016)
    at
org.apache.hadoop.hdfs.DistributedFileSystem$13.doCall(DistributedFileSystem.java:689)
    at
org.apache.hadoop.hdfs.DistributedFileSystem$13.doCall(DistributedFileSystem.java:685)
    at
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at
org.apache.hadoop.hdfs.DistributedFileSystem.truncate(DistributedFileSystem.java:696)
    ... 15 more
2017-08-24 20:22:44,005 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink:
Unnamed (9/16) (af635c8938168acfc85542c830d71002) switched from RUNNING to
FAILED.
java.lang.RuntimeException: Could not invoke truncate.
    at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.handlePendingInProgressFile(BucketingSink.java:846)
    at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.handleRestoredBucketState(BucketingSink.java:718)
    at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:370)
    at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:177)
    at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:159)
    at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:105)
    at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:251)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:675)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:662)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:251)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.GeneratedMethodAccessor308.invoke(Unknown Source)
    at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.handlePendingInProgressFile(BucketingSink.java:808)
    ... 11 more
Caused by:
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.HadoopIllegalArgumentException):
Cannot truncate to a larger file size. Current size: 38029300, truncate
size: 44601803.
    at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncateInternal(FSNamesystem.java:2089)
    at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncateInt(FSNamesystem.java:2027)
    at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.truncate(FSNamesystem.java:1997)
    at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.truncate(NameNodeRpcServer.java:930)
    at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.truncate(ClientNamenodeProtocolServerSideTranslatorPB.java:599)
    at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
    at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)

    at org.apache.hadoop.ipc.Client.call(Client.java:1475)
    at org.apache.hadoop.ipc.Client.call(Client.java:1412)
    at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
    at com.sun.proxy.$Proxy12.truncate(Unknown Source)
    at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.truncate(ClientNamenodeProtocolTranslatorPB.java:313)
    at sun.reflect.GeneratedMethodAccessor309.invoke(Unknown Source)
    at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
    at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
    at com.sun.proxy.$Proxy13.truncate(Unknown Source)
    at org.apache.hadoop.hdfs.DFSClient.truncate(DFSClient.java:2016)
    at
org.apache.hadoop.hdfs.DistributedFileSystem$13.doCall(DistributedFileSystem.java:689)
    at
org.apache.hadoop.hdfs.DistributedFileSystem$13.doCall(DistributedFileSystem.java:685)
    at
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at
org.apache.hadoop.hdfs.DistributedFileSystem.truncate(DistributedFileSystem.java:696)
    ... 15 more


On Thu, Aug 24, 2017 at 4:25 AM, Stefan Richter <s.richter@data-artisans.com
> wrote:

> Hi,
>
> I think there are two different things mixed up in your analysis. The
> stack trace that you provided is caused by a failing checkpoint - in
> writing, not in reading. It seems to fail from a Timeout of your HDFS
> connection. This close method has also nothing to do with the close method
> in the writer. It is the close method of the CheckpointOutputStream.
> Furthermore, „could not materialize checkpoint“ seems to happen in cancel,
> so if the checkpoint got canceled that means this is an effect and not the
> cause. There should be another exception further up in the logs that gives
> the real reason why the checkpoint was canceled.
>
> Nevertheless, the timeout is strange and you should check if your DFS is
> properly configured and running as expected. The reported exception should
> have no direct connection with your ParquetWriter. It is possible that the
> checkpoint was canceled because some problem happened in the ParquetWriter,
> but then we are looking at the wrong stack trace.
>
> As for the pending files, different DFS implementations could have
> different points where flush() is called. I think your implementation also
> properly forward to writer.flush?
>
> Best,
> Stefan
>
> > Am 23.08.2017 um 21:05 schrieb Biswajit Das <bi...@gmail.com>:
> >
> > Hi There ,
> >
> > I'm using custom writer with hourly Rolling Bucket sink . I'm seeing two
> issue
> >
> > first one if write the same file on s3 all the files
> > gets committed , however when I write the same on HDFS I see its remains
> on .pending state , could be related to second problem below
> >
> > Second issue : My custom writer is writing Avro to parquet and writer is
> something like this extended from BaseStreamWriter
> >
> >
> >   @transient private var writer: ParquetWriter[T] = _
> >
> >   override def open(fs: FileSystem, path: Path): Unit = {
> >     val conf = new Configuration()
> >     conf.setBoolean(ADD_LIST_ELEMENT_RECORDS, false)
> >     conf.setBoolean(WRITE_OLD_LIST_STRUCTURE, false)
> >     writer = AvroParquetWriter
> >       .builder[T](path)
> >       .withSchema(new Schema.Parser().parse(schema))
> >       .withCompressionCodec(compressionCodecName)
> >       .withConf(conf)
> >       .build()
> >   }
> >
> >   override def write(element: T): Unit = writer.write(element)
> >
> >   override def duplicate(): Writer[T] = new AvroParquetSinkWriter[T](
> schema)
> >
> >   override def close(): Unit = writer.close()
> >
> >   override def getPos: Long = writer.getDataSize
> >
> >   override def flush(): Long = super.flush()
> >
> >
> > What I noticed during recovering from checkpoint it fails to flush ,
> although I have overriden flush ^^ above . The issue seems
> > it doesn't have handle of stream writer that's why it is failing when
> flush call for stream writer , not sure if first .pedning
> > state is related to this also .
> >
> >
> > --------------------------------------------------
> > 11:52:04.082 [pool-13-thread-1] INFO  o.a.flink.runtime.taskmanager.Task
> - Source: eo_open- kafka source (1/1) (d926613dfcb5ac993a362e9b985e40d6)
> switched from RUNNING to FAILED.
> > org.apache.flink.streaming.runtime.tasks.AsynchronousException:
> java.lang.Exception: Could not materialize checkpoint 4 for operator
> Source:- kafka source (1/1).
> >     at org.apache.flink.streaming.runtime.tasks.StreamTask$
> AsyncCheckpointRunnable.run(StreamTask.java:970)
> ~[flink-streaming-java_2.10-1.3.2.jar:1.3.2]
> >     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> [na:1.8.0_73]
> >     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> [na:1.8.0_73]
> >     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> [na:1.8.0_73]
> >     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> [na:1.8.0_73]
> >     at java.lang.Thread.run(Thread.java:745) [na:1.8.0_73]
> > Caused by: java.lang.Exception: Could not materialize checkpoint 4 for
> operator Source: eo_open- kafka source (1/1).
> >     ... 6 common frames omitted
> > Caused by: java.util.concurrent.ExecutionException:
> java.io.IOException: Could not flush and close the file system output
> stream to hdfs://XXXX:8020/checkpoint/data/das/
> 2aecebbb9d46fb2231edb4b5bd00f6fc/chk-4/e33e1f23-c834-4c96-8708-22e648114d6c
> in order to obtain the stream state handle
> >     at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> [na:1.8.0_73]
> >     at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> [na:1.8.0_73]
> >     at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
> ~[flink-core-1.3.2.jar:1.3.2]
> >     at org.apache.flink.streaming.runtime.tasks.StreamTask$
> AsyncCheckpointRunnable.run(StreamTask.java:906)
> ~[flink-streaming-java_2.10-1.3.2.jar:1.3.2]
> >     ... 5 common frames omitted
> >     Suppressed: java.lang.Exception: Could not properly cancel managed
> operator state future.
> >         at org.apache.flink.streaming.api.operators.
> OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:98)
> ~[flink-streaming-java_2.10-1.3.2.jar:1.3.2]
> >         at org.apache.flink.streaming.runtime.tasks.StreamTask$
> AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
> ~[flink-streaming-java_2.10-1.3.2.jar:1.3.2]
> >         at org.apache.flink.streaming.runtime.tasks.StreamTask$
> AsyncCheckpointRunnable.run(StreamTask.java:961)
> ~[flink-streaming-java_2.10-1.3.2.jar:1.3.2]
> >         ... 5 common frames omitted
> >     Caused by: java.util.concurrent.ExecutionException:
> java.io.IOException: Could not flush and close the file system output
> stream to hdfs://xxx:8020/checkpoint/data/das/
> 2aecebbb9d46fb2231edb4b5bd00f6fc/chk-4/e33e1f23-c834-4c96-8708-22e648114d6c
> in order to obtain the stream state handle
> >         at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> >         at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> >         at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(
> FutureUtil.java:43)
> >         at org.apache.flink.runtime.state.StateUtil.
> discardStateFuture(StateUtil.java:85)
> >         at org.apache.flink.streaming.api.operators.
> OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:96)
> >         ... 7 common frames omitted
> >     Caused by: java.io.IOException: Could not flush and close the file
> system output stream to hdfs://XXX:8020/checkpoint/data/das/
> 2aecebbb9d46fb2231edb4b5bd00f6fc/chk-4/e33e1f23-c834-4c96-8708-22e648114d6c
> in order to obtain the stream state handle
> >         at org.apache.flink.runtime.state.filesystem.
> FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(
> FsCheckpointStreamFactory.java:336)
> >         at org.apache.flink.runtime.checkpoint.
> AbstractAsyncSnapshotIOCallable.closeStreamAndGetStateHandle(
> AbstractAsyncSnapshotIOCallable.java:100)
> >         at org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.
> performOperation(DefaultOperatorStateBackend.java:270)
> >         at org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.
> performOperation(DefaultOperatorStateBackend.java:233)
> >         at org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.
> call(AbstractAsyncIOCallable.java:72)
> >         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >         at org.apache.flink.runtime.state.DefaultOperatorStateBackend.
> snapshot(DefaultOperatorStateBackend.java:288)
> >         at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:392)
> >         at org.apache.flink.streaming.runtime.tasks.StreamTask$
> CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1162)
> >         at org.apache.flink.streaming.runtime.tasks.StreamTask$
> CheckpointingOperation.executeCheckpointing(StreamTask.java:1094)
> >         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> checkpointState(StreamTask.java:654)
> >         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> performCheckpoint(StreamTask.java:590)
> >         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> triggerCheckpoint(StreamTask.java:521)
> >         at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.
> triggerCheckpoint(SourceStreamTask.java:112)
> >         at org.apache.flink.runtime.taskmanager.Task$3.run(Task.
> java:1185)
> >         ... 5 common frames omitted
> >     Caused by: org.apache.hadoop.net.ConnectTimeoutException: 60000
> millis timeout while waiting for channel to be ready for connect. ch :
> java.nio.channels.SocketChannel[connection-pending remote=/XXXX:50010]
> >         at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:532)
> >         at org.apache.hadoop.hdfs.DFSOutputStream.
> createSocketForPipeline(DFSOutputStream.java:1341)
> >
> >
> >         -------------------------------------
>
>

Re: custom writer fail to recover

Posted by Stefan Richter <s....@data-artisans.com>.
Hi,

I think there are two different things mixed up in your analysis. The stack trace that you provided is caused by a failing checkpoint - in writing, not in reading. It seems to fail from a Timeout of your HDFS connection. This close method has also nothing to do with the close method in the writer. It is the close method of the CheckpointOutputStream. Furthermore, „could not materialize checkpoint“ seems to happen in cancel, so if the checkpoint got canceled that means this is an effect and not the cause. There should be another exception further up in the logs that gives the real reason why the checkpoint was canceled.

Nevertheless, the timeout is strange and you should check if your DFS is properly configured and running as expected. The reported exception should have no direct connection with your ParquetWriter. It is possible that the checkpoint was canceled because some problem happened in the ParquetWriter, but then we are looking at the wrong stack trace.

As for the pending files, different DFS implementations could have different points where flush() is called. I think your implementation also properly forward to writer.flush?

Best,
Stefan

> Am 23.08.2017 um 21:05 schrieb Biswajit Das <bi...@gmail.com>:
> 
> Hi There ,
> 
> I'm using custom writer with hourly Rolling Bucket sink . I'm seeing two issue 
> 
> first one if write the same file on s3 all the files
> gets committed , however when I write the same on HDFS I see its remains on .pending state , could be related to second problem below 
> 
> Second issue : My custom writer is writing Avro to parquet and writer is something like this extended from BaseStreamWriter 
> 
> 
>   @transient private var writer: ParquetWriter[T] = _
> 
>   override def open(fs: FileSystem, path: Path): Unit = {
>     val conf = new Configuration()
>     conf.setBoolean(ADD_LIST_ELEMENT_RECORDS, false)
>     conf.setBoolean(WRITE_OLD_LIST_STRUCTURE, false)
>     writer = AvroParquetWriter
>       .builder[T](path)
>       .withSchema(new Schema.Parser().parse(schema))
>       .withCompressionCodec(compressionCodecName)
>       .withConf(conf)
>       .build()
>   }
> 
>   override def write(element: T): Unit = writer.write(element)
> 
>   override def duplicate(): Writer[T] = new AvroParquetSinkWriter[T](schema)
> 
>   override def close(): Unit = writer.close()
> 
>   override def getPos: Long = writer.getDataSize
> 
>   override def flush(): Long = super.flush()
> 
> 
> What I noticed during recovering from checkpoint it fails to flush , although I have overriden flush ^^ above . The issue seems 
> it doesn't have handle of stream writer that's why it is failing when flush call for stream writer , not sure if first .pedning 
> state is related to this also .
> 
> 
> --------------------------------------------------
> 11:52:04.082 [pool-13-thread-1] INFO  o.a.flink.runtime.taskmanager.Task - Source: eo_open- kafka source (1/1) (d926613dfcb5ac993a362e9b985e40d6) switched from RUNNING to FAILED.
> org.apache.flink.streaming.runtime.tasks.AsynchronousException: java.lang.Exception: Could not materialize checkpoint 4 for operator Source:- kafka source (1/1).
>     at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970) ~[flink-streaming-java_2.10-1.3.2.jar:1.3.2]
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_73]
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_73]
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_73]
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_73]
>     at java.lang.Thread.run(Thread.java:745) [na:1.8.0_73]
> Caused by: java.lang.Exception: Could not materialize checkpoint 4 for operator Source: eo_open- kafka source (1/1).
>     ... 6 common frames omitted
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Could not flush and close the file system output stream to hdfs://XXXX:8020/checkpoint/data/das/2aecebbb9d46fb2231edb4b5bd00f6fc/chk-4/e33e1f23-c834-4c96-8708-22e648114d6c in order to obtain the stream state handle
>     at java.util.concurrent.FutureTask.report(FutureTask.java:122) [na:1.8.0_73]
>     at java.util.concurrent.FutureTask.get(FutureTask.java:192) [na:1.8.0_73]
>     at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) ~[flink-core-1.3.2.jar:1.3.2]
>     at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:906) ~[flink-streaming-java_2.10-1.3.2.jar:1.3.2]
>     ... 5 common frames omitted
>     Suppressed: java.lang.Exception: Could not properly cancel managed operator state future.
>         at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:98) ~[flink-streaming-java_2.10-1.3.2.jar:1.3.2]
>         at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023) ~[flink-streaming-java_2.10-1.3.2.jar:1.3.2]
>         at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961) ~[flink-streaming-java_2.10-1.3.2.jar:1.3.2]
>         ... 5 common frames omitted
>     Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Could not flush and close the file system output stream to hdfs://xxx:8020/checkpoint/data/das/2aecebbb9d46fb2231edb4b5bd00f6fc/chk-4/e33e1f23-c834-4c96-8708-22e648114d6c in order to obtain the stream state handle
>         at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>         at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>         at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>         at org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
>         at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:96)
>         ... 7 common frames omitted
>     Caused by: java.io.IOException: Could not flush and close the file system output stream to hdfs://XXX:8020/checkpoint/data/das/2aecebbb9d46fb2231edb4b5bd00f6fc/chk-4/e33e1f23-c834-4c96-8708-22e648114d6c in order to obtain the stream state handle
>         at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336)
>         at org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable.closeStreamAndGetStateHandle(AbstractAsyncSnapshotIOCallable.java:100)
>         at org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:270)
>         at org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:233)
>         at org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:288)
>         at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:392)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1162)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1094)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:654)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:590)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:521)
>         at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.triggerCheckpoint(SourceStreamTask.java:112)
>         at org.apache.flink.runtime.taskmanager.Task$3.run(Task.java:1185)
>         ... 5 common frames omitted
>     Caused by: org.apache.hadoop.net.ConnectTimeoutException: 60000 millis timeout while waiting for channel to be ready for connect. ch : java.nio.channels.SocketChannel[connection-pending remote=/XXXX:50010]
>         at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:532)
>         at org.apache.hadoop.hdfs.DFSOutputStream.createSocketForPipeline(DFSOutputStream.java:1341)
>         
>         
>         -------------------------------------