You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Prabhu V <vp...@gmail.com> on 2017/08/14 06:15:34 UTC

kerberos yarn - failure in long running streaming application

Hi,

I am running Flink-1.3.2 on yarn (Cloudera 2.6.0-cdh5.7.6). The application
stream data from kafka, groups by key, creates a session window and writes
to HDFS using a rich window function in the "window.apply" method.

The rich window function creates the sequence file thus

SequenceFile.createWriter(
                conf,
                new Option[] {
                        Writer.file(new Path("flink-output/" + filePath)),
                        Writer.compression(CompressionType.BLOCK,
                                new DefaultCodec()),
                        Writer.keyClass(BytesWritable.class),
                        Writer.valueClass(BytesWritable.class) })

The "conf" is created in the "open" method thus

conf = HadoopFileSystem.getHadoopConfiguration();
        for (Map.Entry<String, String> entry :
parameters.toMap().entrySet()) {
            conf.set(entry.getKey(), entry.getValue());
        }

where parameters is the flink.configuration.Configuration object that is an
argument to the open method

The applications runs for about 10 hours before it fails with kerberos
error "Caused by: javax.security.sasl.SaslException: GSS initiate failed
[Caused by GSSException: No valid credentials provided (Mechanism level:
Failed to find any Kerberos tgt)]"

The flink-conf.yaml has the following properties set.
security.kerberos.login.keytab: <keytab location>
security.kerberos.login.principal:<principal>
security.kerberos.login.contexts: Client,KafkaClien

Any help would be appreciated.


Thanks,
Prabhu

Re: kerberos yarn - failure in long running streaming application

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi,

At first glance it seems odd, since keytabs would not expire unless on principal password expiration / changes.

Was the principal’s password set for expiration, or changed? The keytab would also expire in that case.

Cheers,
Gordon

On 14 August 2017 at 2:15:40 PM, Prabhu V (vprabhu@gmail.com) wrote:

Hi,

I am running Flink-1.3.2 on yarn (Cloudera 2.6.0-cdh5.7.6). The application stream data from kafka, groups by key, creates a session window and writes to HDFS using a rich window function in the "window.apply" method.

The rich window function creates the sequence file thus

SequenceFile.createWriter(
                conf,
                new Option[] {
                        Writer.file(new Path("flink-output/" + filePath)),
                        Writer.compression(CompressionType.BLOCK,
                                new DefaultCodec()),
                        Writer.keyClass(BytesWritable.class),
                        Writer.valueClass(BytesWritable.class) })

The "conf" is created in the "open" method thus

conf = HadoopFileSystem.getHadoopConfiguration();
        for (Map.Entry<String, String> entry : parameters.toMap().entrySet()) {
            conf.set(entry.getKey(), entry.getValue());
        }

where parameters is the flink.configuration.Configuration object that is an argument to the open method

The applications runs for about 10 hours before it fails with kerberos error "Caused by: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]"

The flink-conf.yaml has the following properties set.
security.kerberos.login.keytab: <keytab location>
security.kerberos.login.principal:<principal>
security.kerberos.login.contexts: Client,KafkaClien

Any help would be appreciated.


Thanks,
Prabhu

Re: kerberos yarn - failure in long running streaming application

Posted by Prabhu V <vp...@gmail.com>.
Thanks for helping fix the issue Eron.

//Eron's email on this issue

I see two interesting things in the log.  One, the TGT has an expiry of 10
hours, according to the Kafka log output:

> 2017-08-13 06:14:48,248 INFO  org.apache.kafka.common.security.kerberos.Login
              - TGT valid starting at: Sun Aug 13 06:14:48 UTC 2017

> 2017-08-13 06:14:48,249 INFO  org.apache.kafka.common.security.kerberos.Login
              - TGT expires: Sun Aug 13 16:14:48 UTC 2017


So we can say that this problem is related to the relogin thread (or lack
thereof),


The reason that renewal isn't working is probably because Hadoop 2.3 code
is being used by Flink:


> 2017-08-13 06:14:40,044 INFO  org.apache.flink.yarn.YarnTaskManagerRunner
                  -  Hadoop version: 2.3.0

The Hadoop dependencies are shaded inside Flink's libraries.   The CDH
libraries that I see on the classpath aren't really used, AFAIK.    Maybe
try using a build of Flink based on Hadoop 2.6 to match the CDH environment.

Hope this helps!

//End email


The issue was the incorrect hadoop version resulting from the way I built
the project. My build configuration includes the following dependencies
    compile 'org.apache.flink:flink-streaming-java_2.10:1.3.2'
    compile 'org.apache.flink:flink-connector-kafka-0.9_2.10:1.3.2'
    compile 'org.apache.flink:flink-connector-filesystem_2.10:1.3.2'

I am using flink built for hadoop 2.6 (
*flink-1.3.2-bin-hadoop26-scala_2.10.tgz*
<http://mirror.cc.columbia.edu/pub/software/apache/flink/flink-1.3.2/flink-1.3.2-bin-hadoop26-scala_2.10.tgz>
)

I did NOT build a uber jar instead included the dependency jars in the
runtime classpath. The dependencies included
flink-shaded-hadoop2-1.3.2.jar. This jar had classes belonging to hadoop2.3
and was causing the issue. Removing this jar from the classpath fixed the
issue. The hadoop version is now 2.6.3

Thanks,
Prabhu

On Mon, Aug 14, 2017 at 9:30 AM, Eron Wright <er...@gmail.com> wrote:

> It sounds to me that the TGT is expiring (usually after 12 hours).   This
> shouldn't happen in the keytab scenario because of a background thread
> provided by Hadoop that periodically performs a re-login using the keytab.
>   More details on the Hadoop internals here:
> https://stackoverflow.com/a/34691071/3026310
>
> To help narrow down the issue:
> 1. please share the stack trace (and, does the error occur on Job Manager
> or on Task Manager?)
> 2. is kinit being called on the client prior to calling `flink run`?
>  (just curious)
> 3. are you willing to share the Flink logs?
>
> I'm happy to help if you prefer to share the the logs privately.
>
> -Eron
>
> On Mon, Aug 14, 2017 at 12:32 AM, Ted Yu <yu...@gmail.com> wrote:
>
>> bq. security.kerberos.login.contexts: Client,KafkaClien
>>
>> Just curious: there is missing 't' at the end of the above line.
>>
>> Maybe a typo when composing the email ?
>>
>> On Sun, Aug 13, 2017 at 11:15 PM, Prabhu V <vp...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I am running Flink-1.3.2 on yarn (Cloudera 2.6.0-cdh5.7.6). The
>>> application stream data from kafka, groups by key, creates a session window
>>> and writes to HDFS using a rich window function in the "window.apply"
>>> method.
>>>
>>> The rich window function creates the sequence file thus
>>>
>>> SequenceFile.createWriter(
>>>                 conf,
>>>                 new Option[] {
>>>                         Writer.file(new Path("flink-output/" +
>>> filePath)),
>>>                         Writer.compression(CompressionType.BLOCK,
>>>                                 new DefaultCodec()),
>>>                         Writer.keyClass(BytesWritable.class),
>>>                         Writer.valueClass(BytesWritable.class) })
>>>
>>> The "conf" is created in the "open" method thus
>>>
>>> conf = HadoopFileSystem.getHadoopConfiguration();
>>>         for (Map.Entry<String, String> entry :
>>> parameters.toMap().entrySet()) {
>>>             conf.set(entry.getKey(), entry.getValue());
>>>         }
>>>
>>> where parameters is the flink.configuration.Configuration object that
>>> is an argument to the open method
>>>
>>> The applications runs for about 10 hours before it fails with kerberos
>>> error "Caused by: javax.security.sasl.SaslException: GSS initiate
>>> failed [Caused by GSSException: No valid credentials provided (Mechanism
>>> level: Failed to find any Kerberos tgt)]"
>>>
>>> The flink-conf.yaml has the following properties set.
>>> security.kerberos.login.keytab: <keytab location>
>>> security.kerberos.login.principal:<principal>
>>> security.kerberos.login.contexts: Client,KafkaClien
>>>
>>> Any help would be appreciated.
>>>
>>>
>>> Thanks,
>>> Prabhu
>>>
>>
>>
>

Re: kerberos yarn - failure in long running streaming application

Posted by Eron Wright <er...@gmail.com>.
It sounds to me that the TGT is expiring (usually after 12 hours).   This
shouldn't happen in the keytab scenario because of a background thread
provided by Hadoop that periodically performs a re-login using the keytab.
  More details on the Hadoop internals here:
https://stackoverflow.com/a/34691071/3026310

To help narrow down the issue:
1. please share the stack trace (and, does the error occur on Job Manager
or on Task Manager?)
2. is kinit being called on the client prior to calling `flink run`?  (just
curious)
3. are you willing to share the Flink logs?

I'm happy to help if you prefer to share the the logs privately.

-Eron

On Mon, Aug 14, 2017 at 12:32 AM, Ted Yu <yu...@gmail.com> wrote:

> bq. security.kerberos.login.contexts: Client,KafkaClien
>
> Just curious: there is missing 't' at the end of the above line.
>
> Maybe a typo when composing the email ?
>
> On Sun, Aug 13, 2017 at 11:15 PM, Prabhu V <vp...@gmail.com> wrote:
>
>> Hi,
>>
>> I am running Flink-1.3.2 on yarn (Cloudera 2.6.0-cdh5.7.6). The
>> application stream data from kafka, groups by key, creates a session window
>> and writes to HDFS using a rich window function in the "window.apply"
>> method.
>>
>> The rich window function creates the sequence file thus
>>
>> SequenceFile.createWriter(
>>                 conf,
>>                 new Option[] {
>>                         Writer.file(new Path("flink-output/" + filePath)),
>>                         Writer.compression(CompressionType.BLOCK,
>>                                 new DefaultCodec()),
>>                         Writer.keyClass(BytesWritable.class),
>>                         Writer.valueClass(BytesWritable.class) })
>>
>> The "conf" is created in the "open" method thus
>>
>> conf = HadoopFileSystem.getHadoopConfiguration();
>>         for (Map.Entry<String, String> entry :
>> parameters.toMap().entrySet()) {
>>             conf.set(entry.getKey(), entry.getValue());
>>         }
>>
>> where parameters is the flink.configuration.Configuration object that is
>> an argument to the open method
>>
>> The applications runs for about 10 hours before it fails with kerberos
>> error "Caused by: javax.security.sasl.SaslException: GSS initiate failed
>> [Caused by GSSException: No valid credentials provided (Mechanism level:
>> Failed to find any Kerberos tgt)]"
>>
>> The flink-conf.yaml has the following properties set.
>> security.kerberos.login.keytab: <keytab location>
>> security.kerberos.login.principal:<principal>
>> security.kerberos.login.contexts: Client,KafkaClien
>>
>> Any help would be appreciated.
>>
>>
>> Thanks,
>> Prabhu
>>
>
>

Re: kerberos yarn - failure in long running streaming application

Posted by Ted Yu <yu...@gmail.com>.
bq. security.kerberos.login.contexts: Client,KafkaClien

Just curious: there is missing 't' at the end of the above line.

Maybe a typo when composing the email ?

On Sun, Aug 13, 2017 at 11:15 PM, Prabhu V <vp...@gmail.com> wrote:

> Hi,
>
> I am running Flink-1.3.2 on yarn (Cloudera 2.6.0-cdh5.7.6). The
> application stream data from kafka, groups by key, creates a session window
> and writes to HDFS using a rich window function in the "window.apply"
> method.
>
> The rich window function creates the sequence file thus
>
> SequenceFile.createWriter(
>                 conf,
>                 new Option[] {
>                         Writer.file(new Path("flink-output/" + filePath)),
>                         Writer.compression(CompressionType.BLOCK,
>                                 new DefaultCodec()),
>                         Writer.keyClass(BytesWritable.class),
>                         Writer.valueClass(BytesWritable.class) })
>
> The "conf" is created in the "open" method thus
>
> conf = HadoopFileSystem.getHadoopConfiguration();
>         for (Map.Entry<String, String> entry :
> parameters.toMap().entrySet()) {
>             conf.set(entry.getKey(), entry.getValue());
>         }
>
> where parameters is the flink.configuration.Configuration object that is
> an argument to the open method
>
> The applications runs for about 10 hours before it fails with kerberos
> error "Caused by: javax.security.sasl.SaslException: GSS initiate failed
> [Caused by GSSException: No valid credentials provided (Mechanism level:
> Failed to find any Kerberos tgt)]"
>
> The flink-conf.yaml has the following properties set.
> security.kerberos.login.keytab: <keytab location>
> security.kerberos.login.principal:<principal>
> security.kerberos.login.contexts: Client,KafkaClien
>
> Any help would be appreciated.
>
>
> Thanks,
> Prabhu
>