You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Alexey Trenikhun <ye...@msn.com> on 2022/04/10 20:39:05 UTC

Broadcast state corrupted ?

Hello,
We have KeyedBroadcastProcessFunction with broadcast state MapStateDescriptor<String, PbCfgTenantDictionary>, where PbCfgTenantDictionary is Protobuf type, for which we custom TypeInformation/TypeSerializer. In one of environment, we can't restore job from savepoint because seems state data is corrupted. I've added to logging to TypeSerializer :

public void serialize(T t, DataOutputView dataOutputView) throws IOException {
    final byte[] data = t.toByteArray();
    dataOutputView.writeInt(data.length);
    dataOutputView.write(data);
    if (PbCfgTenantDictionary.class.equals(prototype.getClass())) {
      LOG.info("serialize PbCfgTenantDictionary.size: {}", data.length);
      LOG.info("serialize PbCfgTenantDictionary.data: {}",
          Base64.getEncoder().encodeToString(data));
    }
  }

 public T deserialize(DataInputView dataInputView) throws IOException {
    final int serializedSize = dataInputView.readInt();
    final com.google.protobuf.Parser<T> parser = Unchecked.cast(prototype.getParserForType());
    final byte[] data = new byte[serializedSize];
    dataInputView.read(data);
    if (PbCfgTenantDictionary.class.equals(prototype.getClass())) {
      LOG.info("deserialize PbCfgTenantDictionary.size: {}", data.length);
      LOG.info("deserialize PbCfgTenantDictionary.data: {}",
          Base64.getEncoder().encodeToString(data));
    }
    return parser.parseFrom(data);
  }

Both serialize and deserialize methods print same size 104048, but data is different, after 4980 base64 characters (3735 bytes) there are only AAAAAAAAAAAAAAAAAAAA....A=
Strangely but the problem effects only 1 environment of 4 I've tried

Re: Broadcast state corrupted ?

Posted by Alexey Trenikhun <ye...@msn.com>.
I think I’ve found the cause : dataInputView.read(data) could read partial data, it returns number of bytes stored in buffer. If I use dataInputView.readFully(data) instead, the problem disappears.

Thanks,
Alexey
________________________________
From: Alexey Trenikhun <ye...@msn.com>
Sent: Wednesday, April 13, 2022 6:44:05 PM
To: Chesnay Schepler <ch...@apache.org>; Flink User Mail List <us...@flink.apache.org>
Subject: Re: Broadcast state corrupted ?


  *   Failing environment is using MinIO.
  *   We use s3p:// filesystem
  *   I don’t see errors in the Job Manager log:

{"timestamp":"2022-04-14T00:14:13.358Z","message":"Triggering savepoint for job 00000000000000000000000000000000.","logger_name":"org.apache.flink.runtime.jobmaster.JobMaster","thread_name":"flink-akka.actor.default-dispatcher-53","level":"INFO","level_value":20000}
{"timestamp":"2022-04-14T00:14:13.372Z","message":"Triggering checkpoint 22636 (type=SAVEPOINT) @ 1649895253360 for job 00000000000000000000000000000000.","logger_name":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator","thread_name":"Checkpoint Timer","level":"INFO","level_value":20000}
{"timestamp":"2022-04-14T00:14:14.966Z","message":"Completed checkpoint 22636 for job 00000000000000000000000000000000 (34858233 bytes in 1532 ms).","logger_name":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator","thread_name":"jobmanager-future-thread-1","level":"INFO","level_value":20000}


  *   I tried cancel with savepoint and cancel from UI. It seems doesn't depend on shutdown, log below is for savepoint without shutdown. And I can't read this savepoint using state API. Size is 104048, but looks like bytes after 456 are 0.
  *   It happens reliable in 1 environment

Thanks,
Alexey
________________________________
From: Chesnay Schepler <ch...@apache.org>
Sent: Wednesday, April 13, 2022 8:23 AM
To: Alexey Trenikhun <ye...@msn.com>; Flink User Mail List <us...@flink.apache.org>
Subject: Re: Broadcast state corrupted ?

Is the failing environment using Azure, or MinIO? Which Flink filesystem did you use?
Where there any errors in the job that took this savepoint? How was the cluster/job shut down?
Does this happen reliably in the 1 environment, or only once? (did you try to reproduce it?)

AFAIK sequences of AAA corresponding to NUL bytes.

I'm wondering if that would mean that part of the data wasn't written properly.
Currently my theory is that either:
a) some data wasn't flushed to the file (i.e., the savepoint was never completely on disk)
b) some user operation (like copying to another directory) corrupted the file
c) some hardware issue corrupted the file.

On 13/04/2022 16:50, Alexey Trenikhun wrote:
Any suggestions how to troubleshoot the issue? I still can reproduce the problem in environment A

Thanks,
Alexey
________________________________
From: Alexey Trenikhun <ye...@msn.com>
Sent: Tuesday, April 12, 2022 7:10:17 AM
To: Chesnay Schepler <ch...@apache.org>; Flink User Mail List <us...@flink.apache.org>
Subject: Re: Broadcast state corrupted ?

I’ve tried to restore job in environment A (where we observe problem) from savepoint taken in environment B - restored fine. So looks something in environment A corrupts savepoint.
________________________________
From: Alexey Trenikhun <ye...@msn.com>
Sent: Monday, April 11, 2022 7:10:51 AM
To: Chesnay Schepler <ch...@apache.org>; Flink User Mail List <us...@flink.apache.org>
Subject: Re: Broadcast state corrupted ?

I didn’t try same savepoint cross environments. Operator with broadcast state was added recently,  I rolled back all environments, created save points with old version, upgraded to version with broadcast state, all 4 were upgraded fine, took savepoints in each environment and tried to restore from them, 3 restored and 4th failed (same environment as original failure). Two environments are deployed in Azure AKS and using Azure Blob Storage, two other are local and use MinIO. Failure happens in one of local environments.
________________________________
From: Chesnay Schepler <ch...@apache.org>
Sent: Monday, April 11, 2022 2:28:48 AM
To: Alexey Trenikhun <ye...@msn.com>; Flink User Mail List <us...@flink.apache.org>
Subject: Re: Broadcast state corrupted ?

Am I understanding things correctly in that the same savepoint cannot be restored from in 1 environment, while it works fine in 3 others?
If so, are they all relying on the same file, or copies of the savepoint?

On 10/04/2022 22:39, Alexey Trenikhun wrote:
Hello,
We have KeyedBroadcastProcessFunction with broadcast state MapStateDescriptor<String, PbCfgTenantDictionary>, where PbCfgTenantDictionary is Protobuf type, for which we custom TypeInformation/TypeSerializer. In one of environment, we can't restore job from savepoint because seems state data is corrupted. I've added to logging to TypeSerializer :

public void serialize(T t, DataOutputView dataOutputView) throws IOException {
    final byte[] data = t.toByteArray();
    dataOutputView.writeInt(data.length);
    dataOutputView.write(data);
    if (PbCfgTenantDictionary.class.equals(prototype.getClass())) {
      LOG.info("serialize PbCfgTenantDictionary.size: {}", data.length);
      LOG.info("serialize PbCfgTenantDictionary.data: {}",
          Base64.getEncoder().encodeToString(data));
    }
  }

 public T deserialize(DataInputView dataInputView) throws IOException {
    final int serializedSize = dataInputView.readInt();
    final com.google.protobuf.Parser<T> parser = Unchecked.cast(prototype.getParserForType());
    final byte[] data = new byte[serializedSize];
    dataInputView.read(data);
    if (PbCfgTenantDictionary.class.equals(prototype.getClass())) {
      LOG.info("deserialize PbCfgTenantDictionary.size: {}", data.length);
      LOG.info("deserialize PbCfgTenantDictionary.data: {}",
          Base64.getEncoder().encodeToString(data));
    }
    return parser.parseFrom(data);
  }

Both serialize and deserialize methods print same size 104048, but data is different, after 4980 base64 characters (3735 bytes) there are only AAAAAAAAAAAAAAAAAAAA....A=
Strangely but the problem effects only 1 environment of 4 I've tried



Re: Broadcast state corrupted ?

Posted by Alexey Trenikhun <ye...@msn.com>.
  *   Failing environment is using MinIO.
  *   We use s3p:// filesystem
  *   I don’t see errors in the Job Manager log:

{"timestamp":"2022-04-14T00:14:13.358Z","message":"Triggering savepoint for job 00000000000000000000000000000000.","logger_name":"org.apache.flink.runtime.jobmaster.JobMaster","thread_name":"flink-akka.actor.default-dispatcher-53","level":"INFO","level_value":20000}
{"timestamp":"2022-04-14T00:14:13.372Z","message":"Triggering checkpoint 22636 (type=SAVEPOINT) @ 1649895253360 for job 00000000000000000000000000000000.","logger_name":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator","thread_name":"Checkpoint Timer","level":"INFO","level_value":20000}
{"timestamp":"2022-04-14T00:14:14.966Z","message":"Completed checkpoint 22636 for job 00000000000000000000000000000000 (34858233 bytes in 1532 ms).","logger_name":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator","thread_name":"jobmanager-future-thread-1","level":"INFO","level_value":20000}


  *   I tried cancel with savepoint and cancel from UI. It seems doesn't depend on shutdown, log below is for savepoint without shutdown. And I can't read this savepoint using state API. Size is 104048, but looks like bytes after 456 are 0.
  *   It happens reliable in 1 environment

Thanks,
Alexey
________________________________
From: Chesnay Schepler <ch...@apache.org>
Sent: Wednesday, April 13, 2022 8:23 AM
To: Alexey Trenikhun <ye...@msn.com>; Flink User Mail List <us...@flink.apache.org>
Subject: Re: Broadcast state corrupted ?

Is the failing environment using Azure, or MinIO? Which Flink filesystem did you use?
Where there any errors in the job that took this savepoint? How was the cluster/job shut down?
Does this happen reliably in the 1 environment, or only once? (did you try to reproduce it?)

AFAIK sequences of AAA corresponding to NUL bytes.

I'm wondering if that would mean that part of the data wasn't written properly.
Currently my theory is that either:
a) some data wasn't flushed to the file (i.e., the savepoint was never completely on disk)
b) some user operation (like copying to another directory) corrupted the file
c) some hardware issue corrupted the file.

On 13/04/2022 16:50, Alexey Trenikhun wrote:
Any suggestions how to troubleshoot the issue? I still can reproduce the problem in environment A

Thanks,
Alexey
________________________________
From: Alexey Trenikhun <ye...@msn.com>
Sent: Tuesday, April 12, 2022 7:10:17 AM
To: Chesnay Schepler <ch...@apache.org>; Flink User Mail List <us...@flink.apache.org>
Subject: Re: Broadcast state corrupted ?

I’ve tried to restore job in environment A (where we observe problem) from savepoint taken in environment B - restored fine. So looks something in environment A corrupts savepoint.
________________________________
From: Alexey Trenikhun <ye...@msn.com>
Sent: Monday, April 11, 2022 7:10:51 AM
To: Chesnay Schepler <ch...@apache.org>; Flink User Mail List <us...@flink.apache.org>
Subject: Re: Broadcast state corrupted ?

I didn’t try same savepoint cross environments. Operator with broadcast state was added recently,  I rolled back all environments, created save points with old version, upgraded to version with broadcast state, all 4 were upgraded fine, took savepoints in each environment and tried to restore from them, 3 restored and 4th failed (same environment as original failure). Two environments are deployed in Azure AKS and using Azure Blob Storage, two other are local and use MinIO. Failure happens in one of local environments.
________________________________
From: Chesnay Schepler <ch...@apache.org>
Sent: Monday, April 11, 2022 2:28:48 AM
To: Alexey Trenikhun <ye...@msn.com>; Flink User Mail List <us...@flink.apache.org>
Subject: Re: Broadcast state corrupted ?

Am I understanding things correctly in that the same savepoint cannot be restored from in 1 environment, while it works fine in 3 others?
If so, are they all relying on the same file, or copies of the savepoint?

On 10/04/2022 22:39, Alexey Trenikhun wrote:
Hello,
We have KeyedBroadcastProcessFunction with broadcast state MapStateDescriptor<String, PbCfgTenantDictionary>, where PbCfgTenantDictionary is Protobuf type, for which we custom TypeInformation/TypeSerializer. In one of environment, we can't restore job from savepoint because seems state data is corrupted. I've added to logging to TypeSerializer :

public void serialize(T t, DataOutputView dataOutputView) throws IOException {
    final byte[] data = t.toByteArray();
    dataOutputView.writeInt(data.length);
    dataOutputView.write(data);
    if (PbCfgTenantDictionary.class.equals(prototype.getClass())) {
      LOG.info("serialize PbCfgTenantDictionary.size: {}", data.length);
      LOG.info("serialize PbCfgTenantDictionary.data: {}",
          Base64.getEncoder().encodeToString(data));
    }
  }

 public T deserialize(DataInputView dataInputView) throws IOException {
    final int serializedSize = dataInputView.readInt();
    final com.google.protobuf.Parser<T> parser = Unchecked.cast(prototype.getParserForType());
    final byte[] data = new byte[serializedSize];
    dataInputView.read(data);
    if (PbCfgTenantDictionary.class.equals(prototype.getClass())) {
      LOG.info("deserialize PbCfgTenantDictionary.size: {}", data.length);
      LOG.info("deserialize PbCfgTenantDictionary.data: {}",
          Base64.getEncoder().encodeToString(data));
    }
    return parser.parseFrom(data);
  }

Both serialize and deserialize methods print same size 104048, but data is different, after 4980 base64 characters (3735 bytes) there are only AAAAAAAAAAAAAAAAAAAA....A=
Strangely but the problem effects only 1 environment of 4 I've tried



Re: Broadcast state corrupted ?

Posted by Chesnay Schepler <ch...@apache.org>.
Is the failing environment using Azure, or MinIO? Which Flink filesystem 
did you use?
Where there any errors in the job that took this savepoint? How was the 
cluster/job shut down?
Does this happen reliably in the 1 environment, or only once? (did you 
try to reproduce it?)

AFAIK sequences of AAA corresponding to NUL bytes.

I'm wondering if that would mean that part of the data wasn't written 
properly.
Currently my theory is that either:
a) some data wasn't flushed to the file (i.e., the savepoint was never 
completely on disk)
b) some user operation (like copying to another directory) corrupted the 
file
c) some hardware issue corrupted the file.

On 13/04/2022 16:50, Alexey Trenikhun wrote:
> Any suggestions how to troubleshoot the issue? I still can reproduce 
> the problem in environment A
>
> Thanks,
> Alexey
> ------------------------------------------------------------------------
> *From:* Alexey Trenikhun <ye...@msn.com>
> *Sent:* Tuesday, April 12, 2022 7:10:17 AM
> *To:* Chesnay Schepler <ch...@apache.org>; Flink User Mail List 
> <us...@flink.apache.org>
> *Subject:* Re: Broadcast state corrupted ?
> I’ve tried to restore job in environment A (where we observe problem) 
> from savepoint taken in environment B - restored fine. So looks 
> something in environment A corrupts savepoint.
> ------------------------------------------------------------------------
> *From:* Alexey Trenikhun <ye...@msn.com>
> *Sent:* Monday, April 11, 2022 7:10:51 AM
> *To:* Chesnay Schepler <ch...@apache.org>; Flink User Mail List 
> <us...@flink.apache.org>
> *Subject:* Re: Broadcast state corrupted ?
> I didn’t try same savepoint cross environments. Operator with 
> broadcast state was added recently,  I rolled back all environments, 
> created save points with old version, upgraded to version with 
> broadcast state, all 4 were upgraded fine, took savepoints in each 
> environment and tried to restore from them, 3 restored and 4th failed 
> (same environment as original failure). Two environments are deployed 
> in Azure AKS and using Azure Blob Storage, two other are local and use 
> MinIO. Failure happens in one of local environments.
> ------------------------------------------------------------------------
> *From:* Chesnay Schepler <ch...@apache.org>
> *Sent:* Monday, April 11, 2022 2:28:48 AM
> *To:* Alexey Trenikhun <ye...@msn.com>; Flink User Mail List 
> <us...@flink.apache.org>
> *Subject:* Re: Broadcast state corrupted ?
> Am I understanding things correctly in that the same savepoint cannot 
> be restored from in 1 environment, while it works fine in 3 others?
> If so, are they all relying on the same file, or copies of the savepoint?
>
> On 10/04/2022 22:39, Alexey Trenikhun wrote:
>> Hello,
>> We have KeyedBroadcastProcessFunction with broadcast 
>> state MapStateDescriptor<String, PbCfgTenantDictionary>, where 
>> PbCfgTenantDictionary is Protobuf type, for which we 
>> custom TypeInformation/TypeSerializer. In one of environment, we 
>> can't restore job from savepoint because seems state data is 
>> corrupted. I've added to logging to TypeSerializer :
>>
>> public void serialize(T t, DataOutputView dataOutputView) throws 
>> IOException {
>>     final byte[] data = t.toByteArray();
>>     dataOutputView.writeInt(data.length);
>>     dataOutputView.write(data);
>>     if (PbCfgTenantDictionary.class.equals(prototype.getClass())) {
>>       LOG.info("serialize PbCfgTenantDictionary.size: {}", data.length);
>>       LOG.info("serialize PbCfgTenantDictionary.data: {}",
>> Base64.getEncoder().encodeToString(data));
>>     }
>>   }
>>
>>  public T deserialize(DataInputView dataInputView) throws IOException {
>>     final int serializedSize = dataInputView.readInt();
>>     final com.google.protobuf.Parser<T> parser = 
>> Unchecked.cast(prototype.getParserForType());
>>     final byte[] data = new byte[serializedSize];
>>     dataInputView.read(data);
>>     if (PbCfgTenantDictionary.class.equals(prototype.getClass())) {
>>       LOG.info("deserialize PbCfgTenantDictionary.size: {}", 
>> data.length);
>>       LOG.info("deserialize PbCfgTenantDictionary.data: {}",
>> Base64.getEncoder().encodeToString(data));
>>     }
>>     return parser.parseFrom(data);
>>   }
>>
>> Both serialize and deserialize methods print same size 104048, but 
>> data is different, after 4980 base64 characters (3735 bytes) there 
>> are only AAAAAAAAAAAAAAAAAAAA....A=
>> Strangely but the problem effects only 1 environment of 4 I've tried
>
>

Re: Broadcast state corrupted ?

Posted by Alexey Trenikhun <ye...@msn.com>.
Any suggestions how to troubleshoot the issue? I still can reproduce the problem in environment A

Thanks,
Alexey
________________________________
From: Alexey Trenikhun <ye...@msn.com>
Sent: Tuesday, April 12, 2022 7:10:17 AM
To: Chesnay Schepler <ch...@apache.org>; Flink User Mail List <us...@flink.apache.org>
Subject: Re: Broadcast state corrupted ?

I’ve tried to restore job in environment A (where we observe problem) from savepoint taken in environment B - restored fine. So looks something in environment A corrupts savepoint.
________________________________
From: Alexey Trenikhun <ye...@msn.com>
Sent: Monday, April 11, 2022 7:10:51 AM
To: Chesnay Schepler <ch...@apache.org>; Flink User Mail List <us...@flink.apache.org>
Subject: Re: Broadcast state corrupted ?

I didn’t try same savepoint cross environments. Operator with broadcast state was added recently,  I rolled back all environments, created save points with old version, upgraded to version with broadcast state, all 4 were upgraded fine, took savepoints in each environment and tried to restore from them, 3 restored and 4th failed (same environment as original failure). Two environments are deployed in Azure AKS and using Azure Blob Storage, two other are local and use MinIO. Failure happens in one of local environments.
________________________________
From: Chesnay Schepler <ch...@apache.org>
Sent: Monday, April 11, 2022 2:28:48 AM
To: Alexey Trenikhun <ye...@msn.com>; Flink User Mail List <us...@flink.apache.org>
Subject: Re: Broadcast state corrupted ?

Am I understanding things correctly in that the same savepoint cannot be restored from in 1 environment, while it works fine in 3 others?
If so, are they all relying on the same file, or copies of the savepoint?

On 10/04/2022 22:39, Alexey Trenikhun wrote:
Hello,
We have KeyedBroadcastProcessFunction with broadcast state MapStateDescriptor<String, PbCfgTenantDictionary>, where PbCfgTenantDictionary is Protobuf type, for which we custom TypeInformation/TypeSerializer. In one of environment, we can't restore job from savepoint because seems state data is corrupted. I've added to logging to TypeSerializer :

public void serialize(T t, DataOutputView dataOutputView) throws IOException {
    final byte[] data = t.toByteArray();
    dataOutputView.writeInt(data.length);
    dataOutputView.write(data);
    if (PbCfgTenantDictionary.class.equals(prototype.getClass())) {
      LOG.info("serialize PbCfgTenantDictionary.size: {}", data.length);
      LOG.info("serialize PbCfgTenantDictionary.data: {}",
          Base64.getEncoder().encodeToString(data));
    }
  }

 public T deserialize(DataInputView dataInputView) throws IOException {
    final int serializedSize = dataInputView.readInt();
    final com.google.protobuf.Parser<T> parser = Unchecked.cast(prototype.getParserForType());
    final byte[] data = new byte[serializedSize];
    dataInputView.read(data);
    if (PbCfgTenantDictionary.class.equals(prototype.getClass())) {
      LOG.info("deserialize PbCfgTenantDictionary.size: {}", data.length);
      LOG.info("deserialize PbCfgTenantDictionary.data: {}",
          Base64.getEncoder().encodeToString(data));
    }
    return parser.parseFrom(data);
  }

Both serialize and deserialize methods print same size 104048, but data is different, after 4980 base64 characters (3735 bytes) there are only AAAAAAAAAAAAAAAAAAAA....A=
Strangely but the problem effects only 1 environment of 4 I've tried


Re: Broadcast state corrupted ?

Posted by Alexey Trenikhun <ye...@msn.com>.
I’ve tried to restore job in environment A (where we observe problem) from savepoint taken in environment B - restored fine. So looks something in environment A corrupts savepoint.
________________________________
From: Alexey Trenikhun <ye...@msn.com>
Sent: Monday, April 11, 2022 7:10:51 AM
To: Chesnay Schepler <ch...@apache.org>; Flink User Mail List <us...@flink.apache.org>
Subject: Re: Broadcast state corrupted ?

I didn’t try same savepoint cross environments. Operator with broadcast state was added recently,  I rolled back all environments, created save points with old version, upgraded to version with broadcast state, all 4 were upgraded fine, took savepoints in each environment and tried to restore from them, 3 restored and 4th failed (same environment as original failure). Two environments are deployed in Azure AKS and using Azure Blob Storage, two other are local and use MinIO. Failure happens in one of local environments.
________________________________
From: Chesnay Schepler <ch...@apache.org>
Sent: Monday, April 11, 2022 2:28:48 AM
To: Alexey Trenikhun <ye...@msn.com>; Flink User Mail List <us...@flink.apache.org>
Subject: Re: Broadcast state corrupted ?

Am I understanding things correctly in that the same savepoint cannot be restored from in 1 environment, while it works fine in 3 others?
If so, are they all relying on the same file, or copies of the savepoint?

On 10/04/2022 22:39, Alexey Trenikhun wrote:
Hello,
We have KeyedBroadcastProcessFunction with broadcast state MapStateDescriptor<String, PbCfgTenantDictionary>, where PbCfgTenantDictionary is Protobuf type, for which we custom TypeInformation/TypeSerializer. In one of environment, we can't restore job from savepoint because seems state data is corrupted. I've added to logging to TypeSerializer :

public void serialize(T t, DataOutputView dataOutputView) throws IOException {
    final byte[] data = t.toByteArray();
    dataOutputView.writeInt(data.length);
    dataOutputView.write(data);
    if (PbCfgTenantDictionary.class.equals(prototype.getClass())) {
      LOG.info("serialize PbCfgTenantDictionary.size: {}", data.length);
      LOG.info("serialize PbCfgTenantDictionary.data: {}",
          Base64.getEncoder().encodeToString(data));
    }
  }

 public T deserialize(DataInputView dataInputView) throws IOException {
    final int serializedSize = dataInputView.readInt();
    final com.google.protobuf.Parser<T> parser = Unchecked.cast(prototype.getParserForType());
    final byte[] data = new byte[serializedSize];
    dataInputView.read(data);
    if (PbCfgTenantDictionary.class.equals(prototype.getClass())) {
      LOG.info("deserialize PbCfgTenantDictionary.size: {}", data.length);
      LOG.info("deserialize PbCfgTenantDictionary.data: {}",
          Base64.getEncoder().encodeToString(data));
    }
    return parser.parseFrom(data);
  }

Both serialize and deserialize methods print same size 104048, but data is different, after 4980 base64 characters (3735 bytes) there are only AAAAAAAAAAAAAAAAAAAA....A=
Strangely but the problem effects only 1 environment of 4 I've tried


Re: Broadcast state corrupted ?

Posted by Alexey Trenikhun <ye...@msn.com>.
I didn’t try same savepoint cross environments. Operator with broadcast state was added recently,  I rolled back all environments, created save points with old version, upgraded to version with broadcast state, all 4 were upgraded fine, took savepoints in each environment and tried to restore from them, 3 restored and 4th failed (same environment as original failure). Two environments are deployed in Azure AKS and using Azure Blob Storage, two other are local and use MinIO. Failure happens in one of local environments.
________________________________
From: Chesnay Schepler <ch...@apache.org>
Sent: Monday, April 11, 2022 2:28:48 AM
To: Alexey Trenikhun <ye...@msn.com>; Flink User Mail List <us...@flink.apache.org>
Subject: Re: Broadcast state corrupted ?

Am I understanding things correctly in that the same savepoint cannot be restored from in 1 environment, while it works fine in 3 others?
If so, are they all relying on the same file, or copies of the savepoint?

On 10/04/2022 22:39, Alexey Trenikhun wrote:
Hello,
We have KeyedBroadcastProcessFunction with broadcast state MapStateDescriptor<String, PbCfgTenantDictionary>, where PbCfgTenantDictionary is Protobuf type, for which we custom TypeInformation/TypeSerializer. In one of environment, we can't restore job from savepoint because seems state data is corrupted. I've added to logging to TypeSerializer :

public void serialize(T t, DataOutputView dataOutputView) throws IOException {
    final byte[] data = t.toByteArray();
    dataOutputView.writeInt(data.length);
    dataOutputView.write(data);
    if (PbCfgTenantDictionary.class.equals(prototype.getClass())) {
      LOG.info("serialize PbCfgTenantDictionary.size: {}", data.length);
      LOG.info("serialize PbCfgTenantDictionary.data: {}",
          Base64.getEncoder().encodeToString(data));
    }
  }

 public T deserialize(DataInputView dataInputView) throws IOException {
    final int serializedSize = dataInputView.readInt();
    final com.google.protobuf.Parser<T> parser = Unchecked.cast(prototype.getParserForType());
    final byte[] data = new byte[serializedSize];
    dataInputView.read(data);
    if (PbCfgTenantDictionary.class.equals(prototype.getClass())) {
      LOG.info("deserialize PbCfgTenantDictionary.size: {}", data.length);
      LOG.info("deserialize PbCfgTenantDictionary.data: {}",
          Base64.getEncoder().encodeToString(data));
    }
    return parser.parseFrom(data);
  }

Both serialize and deserialize methods print same size 104048, but data is different, after 4980 base64 characters (3735 bytes) there are only AAAAAAAAAAAAAAAAAAAA....A=
Strangely but the problem effects only 1 environment of 4 I've tried


Re: Broadcast state corrupted ?

Posted by Chesnay Schepler <ch...@apache.org>.
Am I understanding things correctly in that the same savepoint cannot be 
restored from in 1 environment, while it works fine in 3 others?
If so, are they all relying on the same file, or copies of the savepoint?

On 10/04/2022 22:39, Alexey Trenikhun wrote:
> Hello,
> We have KeyedBroadcastProcessFunction with broadcast 
> state MapStateDescriptor<String, PbCfgTenantDictionary>, where 
> PbCfgTenantDictionary is Protobuf type, for which we 
> custom TypeInformation/TypeSerializer. In one of environment, we can't 
> restore job from savepoint because seems state data is corrupted. I've 
> added to logging to TypeSerializer :
>
> public void serialize(T t, DataOutputView dataOutputView) throws 
> IOException {
>     final byte[] data = t.toByteArray();
>     dataOutputView.writeInt(data.length);
>     dataOutputView.write(data);
>     if (PbCfgTenantDictionary.class.equals(prototype.getClass())) {
>       LOG.info("serialize PbCfgTenantDictionary.size: {}", data.length);
>       LOG.info("serialize PbCfgTenantDictionary.data: {}",
>           Base64.getEncoder().encodeToString(data));
>     }
>   }
>
>  public T deserialize(DataInputView dataInputView) throws IOException {
>     final int serializedSize = dataInputView.readInt();
>     final com.google.protobuf.Parser<T> parser = 
> Unchecked.cast(prototype.getParserForType());
>     final byte[] data = new byte[serializedSize];
>     dataInputView.read(data);
>     if (PbCfgTenantDictionary.class.equals(prototype.getClass())) {
>       LOG.info("deserialize PbCfgTenantDictionary.size: {}", data.length);
>       LOG.info("deserialize PbCfgTenantDictionary.data: {}",
>           Base64.getEncoder().encodeToString(data));
>     }
>     return parser.parseFrom(data);
>   }
>
> Both serialize and deserialize methods print same size 104048, but 
> data is different, after 4980 base64 characters (3735 bytes) there are 
> only AAAAAAAAAAAAAAAAAAAA....A=
> Strangely but the problem effects only 1 environment of 4 I've tried