You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by vipul singh <ne...@gmail.com> on 2017/10/20 06:24:45 UTC

Custom Sink Checkpointing errors

Hello all,

I am working on a custom sink implementation, but having weird issues with
checkpointing.

I am using a custom ListState to checkpoint, and it looks like this:

private var checkpointMessages: ListState[Bucket] =_


My snapshot function looks like:

@throws[IOException]
def snapshotState(context: FunctionSnapshotContext): Unit = {
  checkpointMessages.clear()
      for((bucketName, bucket) <- bufferedMessages) {

        // cloning to avoid any conncurrent modification issues
        var new_buffer = new ListBuffer[GenericRecord]()

        bucket.buffer.foreach(f=> new_buffer += f)

        val new_bucket = new Bucket(bucketName, new_buffer, bucket.timestamp)

        if(shouldUpload(bucketName)) uploadFile (bucketName)
        else checkpointMessages.add(new_bucket)
      }}

where class bucket is:

@SerialVersionUID(1L)
class Bucket(var name: String, var buffer: ListBuffer[GenericRecord],
var timestamp: Long) extends Serializable{
  def this(name: String) = {
    this(name, ListBuffer[GenericRecord](), new Date().getTime)
  }
}


BufferredMessages signature is

private val bufferedMessages = collection.mutable.Map[String, Bucket]()


The basic idea behind this implementation is I maintain multiple buffers,
and push messages(org.apache.avro.generic.GenericRecord) during the @invoke
section of the sink, upon reaching certain thresholds I archive these on s3.

I try to run this both locally in intellij and on a cluster:

On Intellij the process runs for a bit( checkpoints 3-4 times) and then
error out with the exception below:


# A fatal error has been detected by the Java Runtime Environment:
#
#  SIGSEGV (0xb) at pc=0x000000010d46440c, pid=25232, tid=0x0000000000003903
#
# JRE version: Java(TM) SE Runtime Environment (8.0_131-b11) (build
1.8.0_131-b11)
# Java VM: Java HotSpot(TM) 64-Bit Server VM (25.131-b11 mixed mode
bsd-amd64 compressed oops)
# Problematic frame:
# V  [libjvm.dylib+0x46440c]
#
# Core dump written. Default location: /cores/core or core.25232
#
# An error report file with more information is saved as:
# hs_err_pid25232.log
#
# If you would like to submit a bug report, please visit:
#   http://bugreport.java.com/bugreport/crash.jsp
# The crash happened outside the Java Virtual Machine in native code.
# See problematic frame for where to report the bug.
#
Disconnected from the target VM, address: '127.0.0.1:60979', transport:
'socket'

Process finished with exit code 134 (interrupted by signal 6: SIGABRT)

I managed to collect a core dump:
https://gist.github.com/neoeahit/38a02955c1de7501561fba2e593d5f6a.

On a cluster I start to set concurrent serialization issues:
https://gist.github.com/neoeahit/75a078f3672dd4c234c5cd25eba05c47

My initial guess is this is happening due to the size of the ListState? but
i checked the number of records are around ~10k in the buffer. Due to the
nature of the application, we have to implement this in a custom sink.

Could someone please help me/ guide me to troubleshoot this further.

-- 
Thanking in advance,
Vipul

Re: Custom Sink Checkpointing errors

Posted by vipul singh <ne...@gmail.com>.
Thanks Stefan. I found the issue in my application. Everything is working
as excepted now.
Once again thanks for the help and advice.

On Fri, Oct 20, 2017 at 4:51 AM, vipul singh <ne...@gmail.com> wrote:

> Thanks Stefan for the answers. The serialization is happening during the
> creation of snapshot state. I have added a gist with a larger stacktrace(
> https://gist.github.com/neoeahit/aee5562bf0b8d8d02e2a012f6735d850). I am
> not using any serializer, in the custom sink.
>
> We have
>
> src.keyBy(m => (m.topic, m.partition))
>     .map(message => updateMessage(message, config))
> .addSink(new CustomSink(config)).uid(FLINK_JOB_ID).setParallelism(src.parallelism)
> .name(FLINK_JOB_ID)
>
> So there should be a 1-1 source and sink mapping, i am assuming.
>
> If possible could you could please give some more pointers to help
> troubleshoot
>
> Thanks,
> Vipul
>
>
> On Fri, Oct 20, 2017 at 2:58 AM, Stefan Richter <
> s.richter@data-artisans.com> wrote:
>
>> Hi,
>>
>> the crash looks unrelated to Flink code from the dump’s trace. Since it
>> happens somewhere in managing a jar file, it might be related to this:
>> https://bugs.openjdk.java.net/browse/JDK-8142508 , point (2). Maybe your
>> jar gets overwritten while running, e.g. from your IDE?
>>
>> The serialization exception looks like the custom sink is using the same
>> serializer in different threads concurrently. I don’t have the full custom
>> code but this would be my guess. Ensure to duplicate serializers whenever
>> different threads could work on them, e.g. processing vs checkpointing.
>>
>> Best,
>> Stefan
>>
>>
>>
>>
>> Am 20.10.2017 um 14:24 schrieb vipul singh <ne...@gmail.com>:
>>
>> Hello all,
>>
>> I am working on a custom sink implementation, but having weird issues
>> with checkpointing.
>>
>> I am using a custom ListState to checkpoint, and it looks like this:
>>
>> private var checkpointMessages: ListState[Bucket] =_
>>
>>
>> My snapshot function looks like:
>>
>> @throws[IOException]
>> def snapshotState(context: FunctionSnapshotContext): Unit = {
>>   checkpointMessages.clear()
>>       for((bucketName, bucket) <- bufferedMessages) {
>>
>>         // cloning to avoid any conncurrent modification issues
>>         var new_buffer = new ListBuffer[GenericRecord]()
>>
>>         bucket.buffer.foreach(f=> new_buffer += f)
>>
>>         val new_bucket = new Bucket(bucketName, new_buffer, bucket.timestamp)
>>
>>         if(shouldUpload(bucketName)) uploadFile (bucketName)
>>         else checkpointMessages.add(new_bucket)
>>       }}
>>
>> where class bucket is:
>>
>> @SerialVersionUID(1L)
>> class Bucket(var name: String, var buffer: ListBuffer[GenericRecord], var timestamp: Long) extends Serializable{
>>   def this(name: String) = {
>>     this(name, ListBuffer[GenericRecord](), new Date().getTime)
>>   }
>> }
>>
>>
>> BufferredMessages signature is
>>
>> private val bufferedMessages = collection.mutable.Map[String, Bucket]()
>>
>>
>> The basic idea behind this implementation is I maintain multiple buffers,
>> and push messages(org.apache.avro.generic.GenericRecord) during the
>> @invoke section of the sink, upon reaching certain thresholds I archive
>> these on s3.
>>
>> I try to run this both locally in intellij and on a cluster:
>>
>> On Intellij the process runs for a bit( checkpoints 3-4 times) and then
>> error out with the exception below:
>>
>>
>> # A fatal error has been detected by the Java Runtime Environment:
>> #
>> #  SIGSEGV (0xb) at pc=0x000000010d46440c, pid=25232,
>> tid=0x0000000000003903
>> #
>> # JRE version: Java(TM) SE Runtime Environment (8.0_131-b11) (build
>> 1.8.0_131-b11)
>> # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.131-b11 mixed mode
>> bsd-amd64 compressed oops)
>> # Problematic frame:
>> # V  [libjvm.dylib+0x46440c]
>> #
>> # Core dump written. Default location: /cores/core or core.25232
>> #
>> # An error report file with more information is saved as:
>> # hs_err_pid25232.log
>> #
>> # If you would like to submit a bug report, please visit:
>> #   http://bugreport.java.com/bugreport/crash.jsp
>> # The crash happened outside the Java Virtual Machine in native code.
>> # See problematic frame for where to report the bug.
>> #
>> Disconnected from the target VM, address: '127.0.0.1:60979', transport:
>> 'socket'
>>
>> Process finished with exit code 134 (interrupted by signal 6: SIGABRT)
>>
>> I managed to collect a core dump: https://gist.github.com/
>> neoeahit/38a02955c1de7501561fba2e593d5f6a.
>>
>> On a cluster I start to set concurrent serialization issues:
>> https://gist.github.com/neoeahit/75a078f3672dd4c234c5cd25eba05c47
>>
>> My initial guess is this is happening due to the size of the ListState?
>> but i checked the number of records are around ~10k in the buffer. Due to
>> the nature of the application, we have to implement this in a custom sink.
>>
>> Could someone please help me/ guide me to troubleshoot this further.
>>
>> --
>> Thanking in advance,
>> Vipul
>>
>>
>>
>
>
> --
> Thanks,
> Vipul
>



-- 
Thanks,
Vipul

Re: Custom Sink Checkpointing errors

Posted by vipul singh <ne...@gmail.com>.
Thanks Stefan for the answers. The serialization is happening during the
creation of snapshot state. I have added a gist with a larger stacktrace(
https://gist.github.com/neoeahit/aee5562bf0b8d8d02e2a012f6735d850). I am
not using any serializer, in the custom sink.

We have

src.keyBy(m => (m.topic, m.partition))
    .map(message => updateMessage(message, config))
.addSink(new CustomSink(config)).uid(FLINK_JOB_ID).setParallelism(src.parallelism)
.name(FLINK_JOB_ID)

So there should be a 1-1 source and sink mapping, i am assuming.

If possible could you could please give some more pointers to help
troubleshoot

Thanks,
Vipul


On Fri, Oct 20, 2017 at 2:58 AM, Stefan Richter <s.richter@data-artisans.com
> wrote:

> Hi,
>
> the crash looks unrelated to Flink code from the dump’s trace. Since it
> happens somewhere in managing a jar file, it might be related to this:
> https://bugs.openjdk.java.net/browse/JDK-8142508 , point (2). Maybe your
> jar gets overwritten while running, e.g. from your IDE?
>
> The serialization exception looks like the custom sink is using the same
> serializer in different threads concurrently. I don’t have the full custom
> code but this would be my guess. Ensure to duplicate serializers whenever
> different threads could work on them, e.g. processing vs checkpointing.
>
> Best,
> Stefan
>
>
>
>
> Am 20.10.2017 um 14:24 schrieb vipul singh <ne...@gmail.com>:
>
> Hello all,
>
> I am working on a custom sink implementation, but having weird issues with
> checkpointing.
>
> I am using a custom ListState to checkpoint, and it looks like this:
>
> private var checkpointMessages: ListState[Bucket] =_
>
>
> My snapshot function looks like:
>
> @throws[IOException]
> def snapshotState(context: FunctionSnapshotContext): Unit = {
>   checkpointMessages.clear()
>       for((bucketName, bucket) <- bufferedMessages) {
>
>         // cloning to avoid any conncurrent modification issues
>         var new_buffer = new ListBuffer[GenericRecord]()
>
>         bucket.buffer.foreach(f=> new_buffer += f)
>
>         val new_bucket = new Bucket(bucketName, new_buffer, bucket.timestamp)
>
>         if(shouldUpload(bucketName)) uploadFile (bucketName)
>         else checkpointMessages.add(new_bucket)
>       }}
>
> where class bucket is:
>
> @SerialVersionUID(1L)
> class Bucket(var name: String, var buffer: ListBuffer[GenericRecord], var timestamp: Long) extends Serializable{
>   def this(name: String) = {
>     this(name, ListBuffer[GenericRecord](), new Date().getTime)
>   }
> }
>
>
> BufferredMessages signature is
>
> private val bufferedMessages = collection.mutable.Map[String, Bucket]()
>
>
> The basic idea behind this implementation is I maintain multiple buffers,
> and push messages(org.apache.avro.generic.GenericRecord) during the
> @invoke section of the sink, upon reaching certain thresholds I archive
> these on s3.
>
> I try to run this both locally in intellij and on a cluster:
>
> On Intellij the process runs for a bit( checkpoints 3-4 times) and then
> error out with the exception below:
>
>
> # A fatal error has been detected by the Java Runtime Environment:
> #
> #  SIGSEGV (0xb) at pc=0x000000010d46440c, pid=25232,
> tid=0x0000000000003903
> #
> # JRE version: Java(TM) SE Runtime Environment (8.0_131-b11) (build
> 1.8.0_131-b11)
> # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.131-b11 mixed mode
> bsd-amd64 compressed oops)
> # Problematic frame:
> # V  [libjvm.dylib+0x46440c]
> #
> # Core dump written. Default location: /cores/core or core.25232
> #
> # An error report file with more information is saved as:
> # hs_err_pid25232.log
> #
> # If you would like to submit a bug report, please visit:
> #   http://bugreport.java.com/bugreport/crash.jsp
> # The crash happened outside the Java Virtual Machine in native code.
> # See problematic frame for where to report the bug.
> #
> Disconnected from the target VM, address: '127.0.0.1:60979', transport:
> 'socket'
>
> Process finished with exit code 134 (interrupted by signal 6: SIGABRT)
>
> I managed to collect a core dump: https://gist.github.com/neoeahit/
> 38a02955c1de7501561fba2e593d5f6a.
>
> On a cluster I start to set concurrent serialization issues:
> https://gist.github.com/neoeahit/75a078f3672dd4c234c5cd25eba05c47
>
> My initial guess is this is happening due to the size of the ListState?
> but i checked the number of records are around ~10k in the buffer. Due to
> the nature of the application, we have to implement this in a custom sink.
>
> Could someone please help me/ guide me to troubleshoot this further.
>
> --
> Thanking in advance,
> Vipul
>
>
>


-- 
Thanks,
Vipul

Re: Custom Sink Checkpointing errors

Posted by Stefan Richter <s....@data-artisans.com>.
Hi,

the crash looks unrelated to Flink code from the dump’s trace. Since it happens somewhere in managing a jar file, it might be related to this: https://bugs.openjdk.java.net/browse/JDK-8142508 <https://bugs.openjdk.java.net/browse/JDK-8142508> , point (2). Maybe your jar gets overwritten while running, e.g. from your IDE?

The serialization exception looks like the custom sink is using the same serializer in different threads concurrently. I don’t have the full custom code but this would be my guess. Ensure to duplicate serializers whenever different threads could work on them, e.g. processing vs checkpointing.

Best,
Stefan


 
> Am 20.10.2017 um 14:24 schrieb vipul singh <ne...@gmail.com>:
> 
> Hello all,
> 
> I am working on a custom sink implementation, but having weird issues with checkpointing.
> 
> I am using a custom ListState to checkpoint, and it looks like this:
> private var checkpointMessages: ListState[Bucket] =_
> 
> My snapshot function looks like:
> 
> @throws[IOException]
> def snapshotState(context: FunctionSnapshotContext): Unit = {
>   checkpointMessages.clear()
>       for((bucketName, bucket) <- bufferedMessages) {
> 
>         // cloning to avoid any conncurrent modification issues
>         var new_buffer = new ListBuffer[GenericRecord]()
> 
>         bucket.buffer.foreach(f=> new_buffer += f)
> 
>         val new_bucket = new Bucket(bucketName, new_buffer, bucket.timestamp)
> 
>         if(shouldUpload(bucketName)) uploadFile (bucketName)
>         else checkpointMessages.add(new_bucket)
>       }}
> where class bucket is:
> @SerialVersionUID(1L)
> class Bucket(var name: String, var buffer: ListBuffer[GenericRecord], var timestamp: Long) extends Serializable{
>   def this(name: String) = {
>     this(name, ListBuffer[GenericRecord](), new Date().getTime)
>   }
> }
> 
> BufferredMessages signature is 
> private val bufferedMessages = collection.mutable.Map[String, Bucket]()
> 
> The basic idea behind this implementation is I maintain multiple buffers, and push messages(org.apache.avro.generic.GenericRecord) during the @invoke section of the sink, upon reaching certain thresholds I archive these on s3.
> 
> I try to run this both locally in intellij and on a cluster:
> 
> On Intellij the process runs for a bit( checkpoints 3-4 times) and then error out with the exception below:
> 
> # A fatal error has been detected by the Java Runtime Environment:
> #
> #  SIGSEGV (0xb) at pc=0x000000010d46440c, pid=25232, tid=0x0000000000003903
> #
> # JRE version: Java(TM) SE Runtime Environment (8.0_131-b11) (build 1.8.0_131-b11)
> # Java VM: Java HotSpot(TM) 64-Bit Server VM (25.131-b11 mixed mode bsd-amd64 compressed oops)
> # Problematic frame:
> # V  [libjvm.dylib+0x46440c]
> #
> # Core dump written. Default location: /cores/core or core.25232
> #
> # An error report file with more information is saved as:
> # hs_err_pid25232.log
> #
> # If you would like to submit a bug report, please visit:
> #   http://bugreport.java.com/bugreport/crash.jsp <http://bugreport.java.com/bugreport/crash.jsp>
> # The crash happened outside the Java Virtual Machine in native code.
> # See problematic frame for where to report the bug.
> #
> Disconnected from the target VM, address: '127.0.0.1:60979 <http://127.0.0.1:60979/>', transport: 'socket'
> 
> Process finished with exit code 134 (interrupted by signal 6: SIGABRT)
> 
> I managed to collect a core dump: https://gist.github.com/neoeahit/38a02955c1de7501561fba2e593d5f6a <https://gist.github.com/neoeahit/38a02955c1de7501561fba2e593d5f6a>. 
> 
> On a cluster I start to set concurrent serialization issues: https://gist.github.com/neoeahit/75a078f3672dd4c234c5cd25eba05c47 <https://gist.github.com/neoeahit/75a078f3672dd4c234c5cd25eba05c47>
> 
> My initial guess is this is happening due to the size of the ListState? but i checked the number of records are around ~10k in the buffer. Due to the nature of the application, we have to implement this in a custom sink.
> 
> Could someone please help me/ guide me to troubleshoot this further.
> 
> -- 
> Thanking in advance,
> Vipul