You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by tao xiao <xi...@gmail.com> on 2022/11/15 12:50:13 UTC

Any way to improve list state get performance

Hi team,

I have a Flink job that joins two streams, let's say A and B streams,
followed by a key process function. In the key process function the job
inserts elements from B stream to a list state if element from A stream
hasn't arrived yet. I am wondering if any way to skip the liststat.get() to
check if there are elements in the list state when A stream arrives to
reduce the call to underlying state (RocksDB)

Here is the code snippet

keyfunction {

process(in, ctx, collector) {
if (in is A stream)
// anyway to check if list state is empty so that we dont need to call
get()?
for (b : liststate.get()) {
.....
}

if (in is B stream)
liststate.add(in)


-- 
Regards,
Tao

RE: "Authentication failed" in "ConnectionState" when enabling internal SSL on Yarn with self signed certificate

Posted by "LINZ, Arnaud" <AL...@bouyguestelecom.fr>.
Last update :
My flink version is 1.14.3 in fact. The application works when enabling internal SSL in “local” intra-jvm cluster mode, so the certificate seems correct.
I see no log in Yarn server side, only that the application get killed.
I will try to take stack traces…

De : LINZ, Arnaud
Envoyé : mardi 22 novembre 2022 17:41
À : user <us...@flink.apache.org>
Objet : RE: "Authentication failed" in "ConnectionState" when enabling internal SSL on Yarn with self signed certificate

Update :
In fact this « Authentication failed” message also appears when SSL is turned off (and when the yarn application succeeds), so it’s more of a warning and has no link with the “freeze” when SSL is turned on.

Thus, when internal SSL is enabled, I have no error in the yarn log, and the only error I get is a “timed out error” like the one you get when you don’t have enough ressources :
(NoResourceAvailableException: Slot request bulk is not fulfillable! Could not allocate the required slot within slot request timeout)
But I do have enough resources.

De : LINZ, Arnaud
Envoyé : mardi 22 novembre 2022 17:18
À : user <us...@flink.apache.org>>
Objet : "Authentication failed" in "ConnectionState" when enabling internal SSL on Yarn with self signed certificate

Hello,
I use Flink 1.14.3 in Yarn cluster mode.
I’ve followed the instructions listed here (https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/security/security-ssl/ <https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/security/security-ssl/%20>   ) to turn on internal SSL:


$ keytool -genkeypair \

  -alias flink.internal \

  -keystore internal.keystore \

  -dname "CN=flink.internal" \

  -storepass internal_store_password \

  -keyalg RSA \

  -keysize 4096 \

  -storetype PKCS12



security.ssl.internal.enabled: true

security.ssl.internal.keystore: /path/to/flink/conf/internal.keystore

security.ssl.internal.truststore: /path/to/flink/conf/internal.keystore

security.ssl.internal.keystore-password: internal_store_password

security.ssl.internal.truststore-password: internal_store_password

security.ssl.internal.key-password: internal_store_password


I’ve shipped the keystore on every node, and get no error from keystore reading.
However the application fails to start (stuck in initializing step), with the only error log in Yarn containers :
15:49:46.397 [main-EventThread] ERROR org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState - Authentication failed


Could you please explain me what this “zookeeper” curator connection does and why it no longer works when enabling internal SSL ?



Best regards,

Arnaud





________________________________

L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.

RE: "Authentication failed" in "ConnectionState" when enabling internal SSL on Yarn with self signed certificate

Posted by "LINZ, Arnaud" <AL...@bouyguestelecom.fr>.
Update :
In fact this « Authentication failed” message also appears when SSL is turned off (and when the yarn application succeeds), so it’s more of a warning and has no link with the “freeze” when SSL is turned on.

Thus, when internal SSL is enabled, I have no error in the yarn log, and the only error I get is a “timed out error” like the one you get when you don’t have enough ressources :
(NoResourceAvailableException: Slot request bulk is not fulfillable! Could not allocate the required slot within slot request timeout)
But I do have enough resources.

De : LINZ, Arnaud
Envoyé : mardi 22 novembre 2022 17:18
À : user <us...@flink.apache.org>
Objet : "Authentication failed" in "ConnectionState" when enabling internal SSL on Yarn with self signed certificate

Hello,
I use Flink 1.11.2 in Yarn cluster mode.
I’ve followed the instructions listed here (https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/security/security-ssl/ <https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/security/security-ssl/%20>   ) to turn on internal SSL:


$ keytool -genkeypair \

  -alias flink.internal \

  -keystore internal.keystore \

  -dname "CN=flink.internal" \

  -storepass internal_store_password \

  -keyalg RSA \

  -keysize 4096 \

  -storetype PKCS12



security.ssl.internal.enabled: true

security.ssl.internal.keystore: /path/to/flink/conf/internal.keystore

security.ssl.internal.truststore: /path/to/flink/conf/internal.keystore

security.ssl.internal.keystore-password: internal_store_password

security.ssl.internal.truststore-password: internal_store_password

security.ssl.internal.key-password: internal_store_password


I’ve shipped the keystore on every node, and get no error from keystore reading.
However the application fails to start (stuck in initializing step), with the only error log in Yarn containers :
15:49:46.397 [main-EventThread] ERROR org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState - Authentication failed


Could you please explain me what this “zookeeper” curator connection does and why it no longer works when enabling internal SSL ?



Best regards,

Arnaud





________________________________

L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.

"Authentication failed" in "ConnectionState" when enabling internal SSL on Yarn with self signed certificate

Posted by "LINZ, Arnaud" <AL...@bouyguestelecom.fr>.
Hello,
I use Flink 1.11.2 in Yarn cluster mode.
I’ve followed the instructions listed here (https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/security/security-ssl/ <https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/security/security-ssl/%20>   ) to turn on internal SSL:


$ keytool -genkeypair \

  -alias flink.internal \

  -keystore internal.keystore \

  -dname "CN=flink.internal" \

  -storepass internal_store_password \

  -keyalg RSA \

  -keysize 4096 \

  -storetype PKCS12



security.ssl.internal.enabled: true

security.ssl.internal.keystore: /path/to/flink/conf/internal.keystore

security.ssl.internal.truststore: /path/to/flink/conf/internal.keystore

security.ssl.internal.keystore-password: internal_store_password

security.ssl.internal.truststore-password: internal_store_password

security.ssl.internal.key-password: internal_store_password


I’ve shipped the keystore on every node, and get no error from keystore reading.
However the application fails to start (stuck in initializing step), with the only error log in Yarn containers :
15:49:46.397 [main-EventThread] ERROR org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState - Authentication failed


Could you please explain me what this “zookeeper” curator connection does and why it no longer works when enabling internal SSL ?



Best regards,

Arnaud





________________________________

L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.

Re: Any way to improve list state get performance

Posted by Xingcan Cui <xi...@gmail.com>.
Hi Tao,

I think you just need an extra `isEmpty` VARIABLE and maintain it properly
(e.g., when restoring the job, check if the list state is empty or not).

Also, I remembered that the list state for rocksdb is not as performant as
the map state when the state is large. Sometimes you could use a map state
with some extra value states to simulate it.

Best,
Xingcan

On Mon, Nov 21, 2022 at 9:20 PM tao xiao <xi...@gmail.com> wrote:

> any suggestion is highly appreciated
>
> On Tue, Nov 15, 2022 at 8:50 PM tao xiao <xi...@gmail.com> wrote:
>
>> Hi team,
>>
>> I have a Flink job that joins two streams, let's say A and B streams,
>> followed by a key process function. In the key process function the job
>> inserts elements from B stream to a list state if element from A stream
>> hasn't arrived yet. I am wondering if any way to skip the liststat.get() to
>> check if there are elements in the list state when A stream arrives to
>> reduce the call to underlying state (RocksDB)
>>
>> Here is the code snippet
>>
>> keyfunction {
>>
>> process(in, ctx, collector) {
>> if (in is A stream)
>> // anyway to check if list state is empty so that we dont need to call
>> get()?
>> for (b : liststate.get()) {
>> .....
>> }
>>
>> if (in is B stream)
>> liststate.add(in)
>>
>>
>> --
>> Regards,
>> Tao
>>
>
>
> --
> Regards,
> Tao
>

Re: Any way to improve list state get performance

Posted by tao xiao <xi...@gmail.com>.
any suggestion is highly appreciated

On Tue, Nov 15, 2022 at 8:50 PM tao xiao <xi...@gmail.com> wrote:

> Hi team,
>
> I have a Flink job that joins two streams, let's say A and B streams,
> followed by a key process function. In the key process function the job
> inserts elements from B stream to a list state if element from A stream
> hasn't arrived yet. I am wondering if any way to skip the liststat.get() to
> check if there are elements in the list state when A stream arrives to
> reduce the call to underlying state (RocksDB)
>
> Here is the code snippet
>
> keyfunction {
>
> process(in, ctx, collector) {
> if (in is A stream)
> // anyway to check if list state is empty so that we dont need to call
> get()?
> for (b : liststate.get()) {
> .....
> }
>
> if (in is B stream)
> liststate.add(in)
>
>
> --
> Regards,
> Tao
>


-- 
Regards,
Tao