You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "LINZ, Arnaud" <AL...@bouyguestelecom.fr> on 2015/08/20 16:08:21 UTC

Using HadoopInputFormat files from Flink/Yarn in a secure cluster gives an error

Hello,

My application handles as input and output some HDFS files in the jobs and in the driver application.
It works in local cluster mode, but when I’m trying to submit it to a yarn client, when I try to use a HadoopInputFormat (that comes from a HCatalog request), I have the following error: Delegation Token can be issued only with kerberos or web authentication (full stack trace below).

Code which I believe causes the error (It’s not clear in the stack trace, as the nearest point in my code is “execEnv.execute()”) :

public synchronized DataSet<T> readTable(String dbName, String tableName, String filter, ExecutionEnvironment cluster,
        final HiveBeanFactory<T> factory) throws IOException {

        // login kerberos if needed (via UserGroupInformation.loginUserFromKeytab(getKerberosPrincipal(), getKerberosKeytab());)
        HdfsTools.getFileSystem();

        // Create M/R job and configure it
        final Job job = Job.getInstance();
        job.setJobName("Flink source for Hive Table " + dbName + "." + tableName);

        // Crée la source
        @SuppressWarnings({ "unchecked", "rawtypes" })
        final HadoopInputFormat<NullWritable, DefaultHCatRecord> inputFormat = new HadoopInputFormat<NullWritable, // CHECKSTYLE:OFF
        DefaultHCatRecord>(// CHECKSTYLE:ON
            (InputFormat) HCatInputFormat.setInput(job, dbName, tableName, filter), //
            NullWritable.class, //
            DefaultHCatRecord.class, //
            job);

        final HCatSchema inputSchema = HCatInputFormat.getTableSchema(job.getConfiguration());
        @SuppressWarnings("serial")
        final DataSet<T> dataSet = cluster
        // Read the table
            .createInput(inputFormat)
            // map bean (key is useless)
            .flatMap(new FlatMapFunction<Tuple2<NullWritable, DefaultHCatRecord>, T>() {
                @Override
                public void flatMap(Tuple2<NullWritable, DefaultHCatRecord> value, Collector<T> out) throws Exception {  // NOPMD
                    final T record = factory.fromHive(value.f1, inputSchema);
                    if (record != null) {
                        out.collect(record);
                    }
                }
            }).returns(beanClass);

        return dataSet;
    }

Maybe I need to explicitely get a token on each node in the initialization of HadoopInputFormat() (overriding configure()) ? That would be difficult since the keyfile is on the driver’s local drive…

StackTrace :

Found YARN properties file /usr/lib/flink/bin/../conf/.yarn-properties
Using JobManager address from YARN properties bt1svlmw.bpa.bouyguestelecom.fr/172.19.115.52:50494
Secure Hadoop environment setup detected. Running in secure context.
2015:08:20 15:04:17 (main) - INFO - com.bouygtel.kuberasdk.main.Application.mainMethod - Dï¿œbut traitement
15:04:18,005 INFO  org.apache.hadoop.security.UserGroupInformation               - Login successful for user alinz using keytab file /usr/users/alinz/alinz.keytab
15:04:20,139 WARN  org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory       - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
Error : Execution Kubera KO : java.lang.IllegalStateException: Error while executing Flink application
com.bouygtel.kuberasdk.main.ApplicationBatch.execCluster(ApplicationBatch.java:84)
com.bouygtel.kubera.main.segment.ApplicationGeoSegment.batchExec(ApplicationGeoSegment.java:68)
com.bouygtel.kuberasdk.main.ApplicationBatch.exec(ApplicationBatch.java:51)
com.bouygtel.kuberasdk.main.Application.mainMethod(Application.java:81)
com.bouygtel.kubera.main.segment.MainGeoSmooth.main(MainGeoSmooth.java:44)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:606)
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
org.apache.flink.client.program.Client.run(Client.java:315)
org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:873)
org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:870)
org.apache.flink.runtime.security.SecurityUtils$1.run(SecurityUtils.java:50)
java.security.AccessController.doPrivileged(Native Method)
javax.security.auth.Subject.doAs(Subject.java:415)
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
org.apache.flink.runtime.security.SecurityUtils.runSecured(SecurityUtils.java:47)
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:870)
org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)

Caused by: org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Failed to submit job dddaf104260eb0f56ff336727ceeb49e (KUBERA-GEO-BRUT2SEGMENT)
org.apache.flink.client.program.Client.run(Client.java:413)
org.apache.flink.client.program.Client.run(Client.java:356)
org.apache.flink.client.program.Client.run(Client.java:349)
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
com.bouygtel.kuberasdk.main.ApplicationBatch.execCluster(ApplicationBatch.java:80)
com.bouygtel.kubera.main.segment.ApplicationGeoSegment.batchExec(ApplicationGeoSegment.java:68)
com.bouygtel.kuberasdk.main.ApplicationBatch.exec(ApplicationBatch.java:51)
com.bouygtel.kuberasdk.main.Application.mainMethod(Application.java:81)
com.bouygtel.kubera.main.segment.MainGeoSmooth.main(MainGeoSmooth.java:44)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:606)
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
org.apache.flink.client.program.Client.run(Client.java:315)
org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:873)
org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:870)
org.apache.flink.runtime.security.SecurityUtils$1.run(SecurityUtils.java:50)
java.security.AccessController.doPrivileged(Native Method)
javax.security.auth.Subject.doAs(Subject.java:415)
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
org.apache.flink.runtime.security.SecurityUtils.runSecured(SecurityUtils.java:47)
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:870)
org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)

Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to submit job dddaf104260eb0f56ff336727ceeb49e (KUBERA-GEO-BRUT2SEGMENT)
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:594)
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100)
scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
akka.actor.Actor$class.aroundReceive(Actor.scala:465)
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
akka.actor.ActorCell.invoke(ActorCell.scala:487)
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
akka.dispatch.Mailbox.run(Mailbox.scala:221)
akka.dispatch.Mailbox.exec(Mailbox.scala:231)
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: Delegation Token can be issued only with kerberos or web authentication
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDelegationToken(FSNamesystem.java:7609)
        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getDelegationToken(NameNodeRpcServer.java:522)
        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:977)
        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2039)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2035)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2033)

org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:162)
org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:469)
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:534)
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100)
scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
akka.actor.Actor$class.aroundReceive(Actor.scala:465)
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
akka.actor.ActorCell.invoke(ActorCell.scala:487)
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
akka.dispatch.Mailbox.run(Mailbox.scala:221)
akka.dispatch.Mailbox.exec(Mailbox.scala:231)
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException): Delegation Token can be issued only with kerberos or web authentication
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDelegationToken(FSNamesystem.java:7609)
        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getDelegationToken(NameNodeRpcServer.java:522)
        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:977)
        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2039)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2035)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2033)

org.apache.hadoop.ipc.Client.call(Client.java:1468)
org.apache.hadoop.ipc.Client.call(Client.java:1399)
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
com.sun.proxy.$Proxy14.getDelegationToken(Unknown Source)
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getDelegationToken(ClientNamenodeProtocolTranslatorPB.java:909)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:606)
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
com.sun.proxy.$Proxy15.getDelegationToken(Unknown Source)
org.apache.hadoop.hdfs.DFSClient.getDelegationToken(DFSClient.java:1029)
org.apache.hadoop.hdfs.DistributedFileSystem.getDelegationToken(DistributedFileSystem.java:1355)
org.apache.hadoop.fs.FileSystem.collectDelegationTokens(FileSystem.java:529)
org.apache.hadoop.fs.FileSystem.addDelegationTokens(FileSystem.java:507)
org.apache.hadoop.hdfs.DistributedFileSystem.addDelegationTokens(DistributedFileSystem.java:2041)
org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:121)
org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:100)
org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodes(TokenCache.java:80)
org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:205)
org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
org.apache.hive.hcatalog.mapreduce.HCatBaseInputFormat.getSplits(HCatBaseInputFormat.java:157)
org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:140)
org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:51)
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:146)
org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:469)
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:534)
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100)
scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
akka.actor.Actor$class.aroundReceive(Actor.scala:465)
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
akka.actor.ActorCell.invoke(ActorCell.scala:487)
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
akka.dispatch.Mailbox.run(Mailbox.scala:221)
akka.dispatch.Mailbox.exec(Mailbox.scala:231)
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



Do you have any clue?

Best regards,
Arnaud



________________________________

L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.

Re: Using HadoopInputFormat files from Flink/Yarn in a secure cluster gives an error

Posted by Stephan Ewen <se...@apache.org>.
I think we need to extend our own FileInputFormats as well to pass the
credentials...

On Fri, Aug 21, 2015 at 12:44 PM, Robert Metzger <rm...@apache.org>
wrote:

> I was able to reproduce the issue. This is the JIRA:
> https://issues.apache.org/jira/browse/FLINK-2555
> I've already opened a pull request with the fix.
>
> The problem was that our HadoopInputFormat wrapper was not correctly
> passing the security credentials from the Job object to the cluster.
>
> Consider this code posted by Arnaud in the initial message:
>
> *final* Job job = Job.*getInstance*();
>
>         job.setJobName("Flink source for Hive Table " + dbName + "." +
> tableName);
>
>
>
>         // Crée la source
>
>         @SuppressWarnings({ "unchecked", "rawtypes" })
>
>         *final* HadoopInputFormat<NullWritable, DefaultHCatRecord>
> inputFormat = *new* HadoopInputFormat<NullWritable, // CHECKSTYLE:OFF
>
>         DefaultHCatRecord>(// CHECKSTYLE:ON
>
>             (InputFormat) HCatInputFormat.*setInput*(job, dbName,
> tableName, filter), //
>
>             NullWritable.*class*, //
>
>             DefaultHCatRecord.*class*, //
>
>             job);
>
>
> in the "Job.getInstance()" call, the current authentication credentials of
> the user are stored.
>
> They are later passed to the HadoopInputFormat class (last line), but
> Flink was not properly making the Credentials available again on the
> cluster.
>
>
> The pull request should resolve the issue (I've verified it on a secured
> CDH 5.3 setup)
>
>
> Thank you for reporting the bug!
>
>
>
> On Thu, Aug 20, 2015 at 5:21 PM, LINZ, Arnaud <AL...@bouyguestelecom.fr>
> wrote:
>
>> Hi Robert,
>>
>>
>>
>> Yes, it’s an internal tool, which get an HDFS FileSystem instance. I do
>> some Kerberos-related operations, needed because I manipulate some HDFS
>> files before executing the application.
>>
>> The local cluster mode is working fine with the same code, and it does
>> some HCat reading / HDFS writing.
>>
>>
>>
>> What HdfsTools does, in a nutshell :
>>
>>   *final* Configuration cfg = *new* Configuration();
>>
>>         cfg.addResource(*new* Path("/home/hadoop/conf/core-site.xml"));
>>
>>         cfg.addResource(*new* Path("/home/hadoop/conf/hdfs-site.xml"));
>>
>>         cfg.addResource(*new* Path(Environnement.*getEnvVarQuietly*(
>> "HADOOP_CONF_DIR") + "/core-site.xml"));
>>
>>         cfg.addResource(*new* Path(Environnement.*getEnvVarQuietly*(
>> "HADOOP_CONF_DIR") + "/hdfs-site.xml"));
>>
>>         // Kerberos handling
>>
>>         *if* (*isKerberosActive*()) {
>>
>>             *loginKerberos*(cfg);
>>
>>         }
>>
>>         filesys = FileSystem.*get*(cfg);
>>
>>
>>
>> And the straightforward kerberos stuff:
>>
>> *public* *static* *synchronized* *void* loginKerberos(Configuration cfg)
>> {
>>
>>         UserGroupInformation.*setConfiguration*(cfg);
>>
>>         *if* (!*loggedIn*) {
>>
>>             *try* {
>>
>>                 UserGroupInformation.*loginUserFromKeytab*(
>> *getKerberosPrincipal*(), *getKerberosKeytab*());
>>
>>                 *loggedIn* = *true*;
>>
>>                 JournalUDF.*logLocalFS*("User " + UserGroupInformation.
>> *getLoginUser*() + " : Kerberos login succeeded ");
>>
>>             }
>>
>>             *catch* (IOException excep) {
>>
>>                 *throw* *new* GaneshRuntimeException("Unable to log
>> (kerberos) : " + excep.toString(), excep);
>>
>>             }
>>
>>         }
>>
>>     }
>>
>> *loggedIn *being static to the class, and *alinz* having all the proper
>> rights.
>>
>>
>>
>> From what I’ve seen on google, spark and hive/oozie ran into the same
>> error and somewhat corrected that, but I don’t know if it will help to see
>> if it’s really the same pb.
>>
>> I’m sending you the full trace on a private mail.
>>
>>
>>
>> Arnaud
>>
>>
>>
>> *De :* Robert Metzger [mailto:rmetzger@apache.org]
>> *Envoyé :* jeudi 20 août 2015 16:42
>> *À :* user@flink.apache.org
>> *Objet :* Re: Using HadoopInputFormat files from Flink/Yarn in a secure
>> cluster gives an error
>>
>>
>>
>> Hi Arnaud,
>>
>>
>>
>> I suspect the "HdfsTools" are something internal from your company?
>>
>> Are they doing any kerberos-related operations?
>>
>>
>>
>> Is the local cluster mode also reading files from the secured HDFS
>> cluster?
>>
>>
>>
>> Flink is taking care of sending the authentication tokens from the client
>> to the jobManager and to the TaskManagers.
>>
>> For HDFS Flink should also use these user settings.
>>
>> I don't know whether the HCatalog code / Hadoop compatbililty code is
>> also doing some kerberos operations which are interfering with our efforts.
>>
>>
>>
>> From the logs, you can see:
>>
>> Secure Hadoop environment setup detected. Running in secure context.
>> 15:04:18,005 INFO
>> org.apache.hadoop.security.UserGroupInformation               - Login
>> successful for user alinz using keytab file /usr/users/alinz/alinz.keytab
>>
>>
>>
>> Is the user "alinz" authorized to access the files in HDFS?
>>
>>
>>
>> I have to admit that I didn't see this issue before.
>>
>> If possible, can you privately send the the full log of the application,
>> using "yarn logs -applicationId <ID>" ?
>>
>>
>>
>>
>>
>> On Thu, Aug 20, 2015 at 4:08 PM, LINZ, Arnaud <AL...@bouyguestelecom.fr>
>> wrote:
>>
>> Hello,
>>
>>
>>
>> My application handles as input and output some HDFS files in the jobs
>> and in the driver application.
>>
>> It works in local cluster mode, but when I’m trying to submit it to a
>> yarn client, when I try to use a HadoopInputFormat (that comes from a
>> HCatalog request), I have the following error: *Delegation Token can be
>> issued only with kerberos or web authentication *(full stack trace
>> below).
>>
>>
>>
>> Code which I believe causes the error (It’s not clear in the stack trace,
>> as the nearest point in my code is “execEnv.execute()”) :
>>
>>
>>
>> *public* *synchronized* DataSet<T> readTable(String dbName, String
>> tableName, String filter, ExecutionEnvironment cluster,
>>
>>         *final* HiveBeanFactory<T> factory) *throws* IOException {
>>
>>
>>
>>         // login kerberos if needed (via
>> UserGroupInformation.loginUserFromKeytab(getKerberosPrincipal(),
>> getKerberosKeytab());)
>>
>>         HdfsTools.*getFileSystem*();
>>
>>
>>
>>         // Create M/R job and configure it
>>
>>         *final* Job job = Job.*getInstance*();
>>
>>         job.setJobName("Flink source for Hive Table " + dbName + "." +
>> tableName);
>>
>>
>>
>>         // Crée la source
>>
>>         @SuppressWarnings({ "unchecked", "rawtypes" })
>>
>>         *final* HadoopInputFormat<NullWritable, DefaultHCatRecord>
>> inputFormat = *new* HadoopInputFormat<NullWritable, // CHECKSTYLE:OFF
>>
>>         DefaultHCatRecord>(// CHECKSTYLE:ON
>>
>>             (InputFormat) HCatInputFormat.*setInput*(job, dbName,
>> tableName, filter), //
>>
>>             NullWritable.*class*, //
>>
>>             DefaultHCatRecord.*class*, //
>>
>>             job);
>>
>>
>>
>>         *final* HCatSchema inputSchema = HCatInputFormat.*getTableSchema*
>> (job.getConfiguration());
>>
>>         @SuppressWarnings("serial")
>>
>>         *final* DataSet<T> dataSet = cluster
>>
>>         // Read the table
>>
>>             .createInput(inputFormat)
>>
>>             // map bean (key is useless)
>>
>>             .flatMap(*new* FlatMapFunction<Tuple2<NullWritable,
>> DefaultHCatRecord>, T>() {
>>
>>                 @Override
>>
>>                 *public* *void* flatMap(Tuple2<NullWritable,
>> DefaultHCatRecord> value, Collector<T> out) *throws* Exception {  //
>> NOPMD
>>
>>                     *final* T record = factory.fromHive(value.f1,
>> inputSchema);
>>
>>                     *if* (record != *null*) {
>>
>>                         out.collect(record);
>>
>>                     }
>>
>>                 }
>>
>>             }).returns(beanClass);
>>
>>
>>
>>         *return* dataSet;
>>
>>     }
>>
>>
>>
>> Maybe I need to explicitely get a token on each node in the
>> initialization of HadoopInputFormat() (overriding configure()) ? That
>> would be difficult since the keyfile is on the driver’s local drive…
>>
>>
>>
>> StackTrace :
>>
>>
>>
>> Found YARN properties file /usr/lib/flink/bin/../conf/.yarn-properties
>>
>> Using JobManager address from YARN properties
>> bt1svlmw.bpa.bouyguestelecom.fr/172.19.115.52:50494
>>
>> Secure Hadoop environment setup detected. Running in secure context.
>>
>> 2015:08:20 15:04:17 (main) - INFO -
>> com.bouygtel.kuberasdk.main.Application.mainMethod - Dï¿œbut traitement
>>
>> 15:04:18,005 INFO
>> org.apache.hadoop.security.UserGroupInformation               - Login
>> successful for user alinz using keytab file /usr/users/alinz/alinz.keytab
>>
>> 15:04:20,139 WARN
>> org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory       - The
>> short-circuit local reads feature cannot be used because libhadoop cannot
>> be loaded.
>>
>> Error : Execution Kubera KO : java.lang.IllegalStateException: Error
>> while executing Flink application
>>
>>
>> com.bouygtel.kuberasdk.main.ApplicationBatch.execCluster(ApplicationBatch.java:84)
>>
>>
>> com.bouygtel.kubera.main.segment.ApplicationGeoSegment.batchExec(ApplicationGeoSegment.java:68)
>>
>>
>> com.bouygtel.kuberasdk.main.ApplicationBatch.exec(ApplicationBatch.java:51)
>>
>> com.bouygtel.kuberasdk.main.Application.mainMethod(Application.java:81)
>>
>> com.bouygtel.kubera.main.segment.MainGeoSmooth.main(MainGeoSmooth.java:44)
>>
>> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>
>>
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>
>>
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>
>> java.lang.reflect.Method.invoke(Method.java:606)
>>
>>
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
>>
>>
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
>>
>> org.apache.flink.client.program.Client.run(Client.java:315)
>>
>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
>>
>> org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
>>
>> org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:873)
>>
>> org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:870)
>>
>>
>> org.apache.flink.runtime.security.SecurityUtils$1.run(SecurityUtils.java:50)
>>
>> java.security.AccessController.doPrivileged(Native Method)
>>
>> javax.security.auth.Subject.doAs(Subject.java:415)
>>
>>
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
>>
>>
>> org.apache.flink.runtime.security.SecurityUtils.runSecured(SecurityUtils.java:47)
>>
>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:870)
>>
>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
>>
>>
>>
>> Caused by: org.apache.flink.client.program.ProgramInvocationException:
>> The program execution failed: Failed to submit job
>> dddaf104260eb0f56ff336727ceeb49e (KUBERA-GEO-BRUT2SEGMENT)
>>
>> org.apache.flink.client.program.Client.run(Client.java:413)
>>
>> org.apache.flink.client.program.Client.run(Client.java:356)
>>
>> org.apache.flink.client.program.Client.run(Client.java:349)
>>
>>
>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
>>
>>
>> com.bouygtel.kuberasdk.main.ApplicationBatch.execCluster(ApplicationBatch.java:80)
>>
>>
>> com.bouygtel.kubera.main.segment.ApplicationGeoSegment.batchExec(ApplicationGeoSegment.java:68)
>>
>>
>> com.bouygtel.kuberasdk.main.ApplicationBatch.exec(ApplicationBatch.java:51)
>>
>> com.bouygtel.kuberasdk.main.Application.mainMethod(Application.java:81)
>>
>> com.bouygtel.kubera.main.segment.MainGeoSmooth.main(MainGeoSmooth.java:44)
>>
>> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>
>>
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>
>>
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>
>> java.lang.reflect.Method.invoke(Method.java:606)
>>
>>
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
>>
>>
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
>>
>> org.apache.flink.client.program.Client.run(Client.java:315)
>>
>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
>>
>> org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
>>
>> org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:873)
>>
>> org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:870)
>>
>>
>> org.apache.flink.runtime.security.SecurityUtils$1.run(SecurityUtils.java:50)
>>
>> java.security.AccessController.doPrivileged(Native Method)
>>
>> javax.security.auth.Subject.doAs(Subject.java:415)
>>
>>
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
>>
>>
>> org.apache.flink.runtime.security.SecurityUtils.runSecured(SecurityUtils.java:47)
>>
>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:870)
>>
>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
>>
>>
>>
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed
>> to submit job dddaf104260eb0f56ff336727ceeb49e (KUBERA-GEO-BRUT2SEGMENT)
>>
>> org.apache.flink.runtime.jobmanager.JobManager.org
>> $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:594)
>>
>>
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
>>
>>
>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>>
>>
>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>>
>>
>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>>
>>
>> org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100)
>>
>> scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
>>
>>
>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
>>
>>
>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
>>
>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>>
>>
>> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
>>
>> akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>>
>>
>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
>>
>> akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>
>> akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>
>> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>>
>> akka.dispatch.Mailbox.run(Mailbox.scala:221)
>>
>> akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>>
>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>
>>
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>
>>
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>>
>>
>> *Caused by: org.apache.flink.runtime.JobException: Creating the input
>> splits caused an error: Delegation Token can be issued only with kerberos
>> or web authentication*
>>
>> *        at
>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDelegationToken(FSNamesystem.java:7609)*
>>
>> *        at
>> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getDelegationToken(NameNodeRpcServer.java:522)*
>>
>> *        at
>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:977)*
>>
>> *        at
>> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)*
>>
>> *        at
>> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)*
>>
>> *        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962)*
>>
>> *        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2039)*
>>
>> *        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2035)*
>>
>> *        at java.security.AccessController.doPrivileged(Native Method)*
>>
>> *        at javax.security.auth.Subject.doAs(Subject.java:415)*
>>
>> *        at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)*
>>
>> *        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2033)*
>>
>>
>>
>>
>> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:162)
>>
>>
>> org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:469)
>>
>> org.apache.flink.runtime.jobmanager.JobManager.org
>> $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:534)
>>
>>
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
>>
>>
>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>>
>>
>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>>
>>
>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>>
>>
>> org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100)
>>
>> scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
>>
>>
>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
>>
>>
>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
>>
>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>>
>>
>> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
>>
>> akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>>
>>
>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
>>
>> akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>
>> akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>
>> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>>
>> akka.dispatch.Mailbox.run(Mailbox.scala:221)
>>
>> akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>>
>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>
>>
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>
>>
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>>
>>
>> Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException):
>> Delegation Token can be issued only with kerberos or web authentication
>>
>>         at
>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDelegationToken(FSNamesystem.java:7609)
>>
>>         at
>> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getDelegationToken(NameNodeRpcServer.java:522)
>>
>>         at
>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:977)
>>
>>         at
>> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>>
>>         at
>> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)
>>
>>         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962)
>>
>>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2039)
>>
>>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2035)
>>
>>         at java.security.AccessController.doPrivileged(Native Method)
>>
>>         at javax.security.auth.Subject.doAs(Subject.java:415)
>>
>>         at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
>>
>>         at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2033)
>>
>>
>>
>> org.apache.hadoop.ipc.Client.call(Client.java:1468)
>>
>> org.apache.hadoop.ipc.Client.call(Client.java:1399)
>>
>>
>> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
>>
>> com.sun.proxy.$Proxy14.getDelegationToken(Unknown Source)
>>
>>
>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getDelegationToken(ClientNamenodeProtocolTranslatorPB.java:909)
>>
>> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>
>>
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>
>>
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>
>> java.lang.reflect.Method.invoke(Method.java:606)
>>
>>
>> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
>>
>>
>> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>>
>> com.sun.proxy.$Proxy15.getDelegationToken(Unknown Source)
>>
>> org.apache.hadoop.hdfs.DFSClient.getDelegationToken(DFSClient.java:1029)
>>
>>
>> org.apache.hadoop.hdfs.DistributedFileSystem.getDelegationToken(DistributedFileSystem.java:1355)
>>
>>
>> org.apache.hadoop.fs.FileSystem.collectDelegationTokens(FileSystem.java:529)
>>
>> org.apache.hadoop.fs.FileSystem.addDelegationTokens(FileSystem.java:507)
>>
>>
>> org.apache.hadoop.hdfs.DistributedFileSystem.addDelegationTokens(DistributedFileSystem.java:2041)
>>
>>
>> org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:121)
>>
>>
>> org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:100)
>>
>>
>> org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodes(TokenCache.java:80)
>>
>>
>> org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:205)
>>
>>
>> org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
>>
>>
>> org.apache.hive.hcatalog.mapreduce.HCatBaseInputFormat.getSplits(HCatBaseInputFormat.java:157)
>>
>>
>> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:140)
>>
>>
>> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:51)
>>
>>
>> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:146)
>>
>>
>> org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:469)
>>
>> org.apache.flink.runtime.jobmanager.JobManager.org
>> $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:534)
>>
>>
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
>>
>>
>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>>
>>
>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>>
>>
>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>>
>>
>> org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100)
>>
>> scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
>>
>>
>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
>>
>>
>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
>>
>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>>
>>
>> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
>>
>> akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>>
>>
>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
>>
>> akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>
>> akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>
>> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>>
>> akka.dispatch.Mailbox.run(Mailbox.scala:221)
>>
>> akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>>
>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>
>>
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>
>>
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>>
>>
>>
>>
>>
>>
>> Do you have any clue?
>>
>>
>>
>> Best regards,
>>
>> Arnaud
>>
>>
>>
>>
>>
>>
>> ------------------------------
>>
>>
>> L'intégrité de ce message n'étant pas assurée sur internet, la société
>> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
>> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
>> vous n'êtes pas destinataire de ce message, merci de le détruire et
>> d'avertir l'expéditeur.
>>
>> The integrity of this message cannot be guaranteed on the Internet. The
>> company that sent this message cannot therefore be held liable for its
>> content nor attachments. Any unauthorized use or dissemination is
>> prohibited. If you are not the intended recipient of this message, then
>> please delete it and notify the sender.
>>
>>
>>
>
>

Re: Using HadoopInputFormat files from Flink/Yarn in a secure cluster gives an error

Posted by Robert Metzger <rm...@apache.org>.
I was able to reproduce the issue. This is the JIRA:
https://issues.apache.org/jira/browse/FLINK-2555
I've already opened a pull request with the fix.

The problem was that our HadoopInputFormat wrapper was not correctly
passing the security credentials from the Job object to the cluster.

Consider this code posted by Arnaud in the initial message:

*final* Job job = Job.*getInstance*();

        job.setJobName("Flink source for Hive Table " + dbName + "." +
tableName);



        // Crée la source

        @SuppressWarnings({ "unchecked", "rawtypes" })

        *final* HadoopInputFormat<NullWritable, DefaultHCatRecord>
inputFormat = *new* HadoopInputFormat<NullWritable, // CHECKSTYLE:OFF

        DefaultHCatRecord>(// CHECKSTYLE:ON

            (InputFormat) HCatInputFormat.*setInput*(job, dbName, tableName
, filter), //

            NullWritable.*class*, //

            DefaultHCatRecord.*class*, //

            job);


in the "Job.getInstance()" call, the current authentication credentials of
the user are stored.

They are later passed to the HadoopInputFormat class (last line), but Flink
was not properly making the Credentials available again on the cluster.


The pull request should resolve the issue (I've verified it on a secured
CDH 5.3 setup)


Thank you for reporting the bug!



On Thu, Aug 20, 2015 at 5:21 PM, LINZ, Arnaud <AL...@bouyguestelecom.fr>
wrote:

> Hi Robert,
>
>
>
> Yes, it’s an internal tool, which get an HDFS FileSystem instance. I do
> some Kerberos-related operations, needed because I manipulate some HDFS
> files before executing the application.
>
> The local cluster mode is working fine with the same code, and it does
> some HCat reading / HDFS writing.
>
>
>
> What HdfsTools does, in a nutshell :
>
>   *final* Configuration cfg = *new* Configuration();
>
>         cfg.addResource(*new* Path("/home/hadoop/conf/core-site.xml"));
>
>         cfg.addResource(*new* Path("/home/hadoop/conf/hdfs-site.xml"));
>
>         cfg.addResource(*new* Path(Environnement.*getEnvVarQuietly*(
> "HADOOP_CONF_DIR") + "/core-site.xml"));
>
>         cfg.addResource(*new* Path(Environnement.*getEnvVarQuietly*(
> "HADOOP_CONF_DIR") + "/hdfs-site.xml"));
>
>         // Kerberos handling
>
>         *if* (*isKerberosActive*()) {
>
>             *loginKerberos*(cfg);
>
>         }
>
>         filesys = FileSystem.*get*(cfg);
>
>
>
> And the straightforward kerberos stuff:
>
> *public* *static* *synchronized* *void* loginKerberos(Configuration cfg) {
>
>         UserGroupInformation.*setConfiguration*(cfg);
>
>         *if* (!*loggedIn*) {
>
>             *try* {
>
>                 UserGroupInformation.*loginUserFromKeytab*(
> *getKerberosPrincipal*(), *getKerberosKeytab*());
>
>                 *loggedIn* = *true*;
>
>                 JournalUDF.*logLocalFS*("User " + UserGroupInformation.
> *getLoginUser*() + " : Kerberos login succeeded ");
>
>             }
>
>             *catch* (IOException excep) {
>
>                 *throw* *new* GaneshRuntimeException("Unable to log
> (kerberos) : " + excep.toString(), excep);
>
>             }
>
>         }
>
>     }
>
> *loggedIn *being static to the class, and *alinz* having all the proper
> rights.
>
>
>
> From what I’ve seen on google, spark and hive/oozie ran into the same
> error and somewhat corrected that, but I don’t know if it will help to see
> if it’s really the same pb.
>
> I’m sending you the full trace on a private mail.
>
>
>
> Arnaud
>
>
>
> *De :* Robert Metzger [mailto:rmetzger@apache.org]
> *Envoyé :* jeudi 20 août 2015 16:42
> *À :* user@flink.apache.org
> *Objet :* Re: Using HadoopInputFormat files from Flink/Yarn in a secure
> cluster gives an error
>
>
>
> Hi Arnaud,
>
>
>
> I suspect the "HdfsTools" are something internal from your company?
>
> Are they doing any kerberos-related operations?
>
>
>
> Is the local cluster mode also reading files from the secured HDFS cluster?
>
>
>
> Flink is taking care of sending the authentication tokens from the client
> to the jobManager and to the TaskManagers.
>
> For HDFS Flink should also use these user settings.
>
> I don't know whether the HCatalog code / Hadoop compatbililty code is also
> doing some kerberos operations which are interfering with our efforts.
>
>
>
> From the logs, you can see:
>
> Secure Hadoop environment setup detected. Running in secure context.
> 15:04:18,005 INFO
> org.apache.hadoop.security.UserGroupInformation               - Login
> successful for user alinz using keytab file /usr/users/alinz/alinz.keytab
>
>
>
> Is the user "alinz" authorized to access the files in HDFS?
>
>
>
> I have to admit that I didn't see this issue before.
>
> If possible, can you privately send the the full log of the application,
> using "yarn logs -applicationId <ID>" ?
>
>
>
>
>
> On Thu, Aug 20, 2015 at 4:08 PM, LINZ, Arnaud <AL...@bouyguestelecom.fr>
> wrote:
>
> Hello,
>
>
>
> My application handles as input and output some HDFS files in the jobs and
> in the driver application.
>
> It works in local cluster mode, but when I’m trying to submit it to a yarn
> client, when I try to use a HadoopInputFormat (that comes from a HCatalog
> request), I have the following error: *Delegation Token can be issued
> only with kerberos or web authentication *(full stack trace below).
>
>
>
> Code which I believe causes the error (It’s not clear in the stack trace,
> as the nearest point in my code is “execEnv.execute()”) :
>
>
>
> *public* *synchronized* DataSet<T> readTable(String dbName, String
> tableName, String filter, ExecutionEnvironment cluster,
>
>         *final* HiveBeanFactory<T> factory) *throws* IOException {
>
>
>
>         // login kerberos if needed (via
> UserGroupInformation.loginUserFromKeytab(getKerberosPrincipal(),
> getKerberosKeytab());)
>
>         HdfsTools.*getFileSystem*();
>
>
>
>         // Create M/R job and configure it
>
>         *final* Job job = Job.*getInstance*();
>
>         job.setJobName("Flink source for Hive Table " + dbName + "." +
> tableName);
>
>
>
>         // Crée la source
>
>         @SuppressWarnings({ "unchecked", "rawtypes" })
>
>         *final* HadoopInputFormat<NullWritable, DefaultHCatRecord>
> inputFormat = *new* HadoopInputFormat<NullWritable, // CHECKSTYLE:OFF
>
>         DefaultHCatRecord>(// CHECKSTYLE:ON
>
>             (InputFormat) HCatInputFormat.*setInput*(job, dbName,
> tableName, filter), //
>
>             NullWritable.*class*, //
>
>             DefaultHCatRecord.*class*, //
>
>             job);
>
>
>
>         *final* HCatSchema inputSchema = HCatInputFormat.*getTableSchema*(
> job.getConfiguration());
>
>         @SuppressWarnings("serial")
>
>         *final* DataSet<T> dataSet = cluster
>
>         // Read the table
>
>             .createInput(inputFormat)
>
>             // map bean (key is useless)
>
>             .flatMap(*new* FlatMapFunction<Tuple2<NullWritable,
> DefaultHCatRecord>, T>() {
>
>                 @Override
>
>                 *public* *void* flatMap(Tuple2<NullWritable,
> DefaultHCatRecord> value, Collector<T> out) *throws* Exception {  // NOPMD
>
>                     *final* T record = factory.fromHive(value.f1,
> inputSchema);
>
>                     *if* (record != *null*) {
>
>                         out.collect(record);
>
>                     }
>
>                 }
>
>             }).returns(beanClass);
>
>
>
>         *return* dataSet;
>
>     }
>
>
>
> Maybe I need to explicitely get a token on each node in the
> initialization of HadoopInputFormat() (overriding configure()) ? That
> would be difficult since the keyfile is on the driver’s local drive…
>
>
>
> StackTrace :
>
>
>
> Found YARN properties file /usr/lib/flink/bin/../conf/.yarn-properties
>
> Using JobManager address from YARN properties
> bt1svlmw.bpa.bouyguestelecom.fr/172.19.115.52:50494
>
> Secure Hadoop environment setup detected. Running in secure context.
>
> 2015:08:20 15:04:17 (main) - INFO -
> com.bouygtel.kuberasdk.main.Application.mainMethod - Dï¿œbut traitement
>
> 15:04:18,005 INFO
> org.apache.hadoop.security.UserGroupInformation               - Login
> successful for user alinz using keytab file /usr/users/alinz/alinz.keytab
>
> 15:04:20,139 WARN
> org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory       - The
> short-circuit local reads feature cannot be used because libhadoop cannot
> be loaded.
>
> Error : Execution Kubera KO : java.lang.IllegalStateException: Error while
> executing Flink application
>
>
> com.bouygtel.kuberasdk.main.ApplicationBatch.execCluster(ApplicationBatch.java:84)
>
>
> com.bouygtel.kubera.main.segment.ApplicationGeoSegment.batchExec(ApplicationGeoSegment.java:68)
>
> com.bouygtel.kuberasdk.main.ApplicationBatch.exec(ApplicationBatch.java:51)
>
> com.bouygtel.kuberasdk.main.Application.mainMethod(Application.java:81)
>
> com.bouygtel.kubera.main.segment.MainGeoSmooth.main(MainGeoSmooth.java:44)
>
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> java.lang.reflect.Method.invoke(Method.java:606)
>
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
>
>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
>
> org.apache.flink.client.program.Client.run(Client.java:315)
>
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
>
> org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
>
> org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:873)
>
> org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:870)
>
>
> org.apache.flink.runtime.security.SecurityUtils$1.run(SecurityUtils.java:50)
>
> java.security.AccessController.doPrivileged(Native Method)
>
> javax.security.auth.Subject.doAs(Subject.java:415)
>
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
>
>
> org.apache.flink.runtime.security.SecurityUtils.runSecured(SecurityUtils.java:47)
>
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:870)
>
> org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
>
>
>
> Caused by: org.apache.flink.client.program.ProgramInvocationException: The
> program execution failed: Failed to submit job
> dddaf104260eb0f56ff336727ceeb49e (KUBERA-GEO-BRUT2SEGMENT)
>
> org.apache.flink.client.program.Client.run(Client.java:413)
>
> org.apache.flink.client.program.Client.run(Client.java:356)
>
> org.apache.flink.client.program.Client.run(Client.java:349)
>
>
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
>
>
> com.bouygtel.kuberasdk.main.ApplicationBatch.execCluster(ApplicationBatch.java:80)
>
>
> com.bouygtel.kubera.main.segment.ApplicationGeoSegment.batchExec(ApplicationGeoSegment.java:68)
>
> com.bouygtel.kuberasdk.main.ApplicationBatch.exec(ApplicationBatch.java:51)
>
> com.bouygtel.kuberasdk.main.Application.mainMethod(Application.java:81)
>
> com.bouygtel.kubera.main.segment.MainGeoSmooth.main(MainGeoSmooth.java:44)
>
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> java.lang.reflect.Method.invoke(Method.java:606)
>
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
>
>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
>
> org.apache.flink.client.program.Client.run(Client.java:315)
>
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
>
> org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
>
> org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:873)
>
> org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:870)
>
>
> org.apache.flink.runtime.security.SecurityUtils$1.run(SecurityUtils.java:50)
>
> java.security.AccessController.doPrivileged(Native Method)
>
> javax.security.auth.Subject.doAs(Subject.java:415)
>
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
>
>
> org.apache.flink.runtime.security.SecurityUtils.runSecured(SecurityUtils.java:47)
>
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:870)
>
> org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
>
>
>
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed
> to submit job dddaf104260eb0f56ff336727ceeb49e (KUBERA-GEO-BRUT2SEGMENT)
>
> org.apache.flink.runtime.jobmanager.JobManager.org
> $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:594)
>
>
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
>
>
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>
>
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>
>
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>
>
> org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100)
>
> scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
>
>
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
>
>
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
>
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>
>
> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
>
> akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>
>
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
>
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>
> akka.actor.ActorCell.invoke(ActorCell.scala:487)
>
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>
> akka.dispatch.Mailbox.run(Mailbox.scala:221)
>
> akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>
>
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
>
> *Caused by: org.apache.flink.runtime.JobException: Creating the input
> splits caused an error: Delegation Token can be issued only with kerberos
> or web authentication*
>
> *        at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDelegationToken(FSNamesystem.java:7609)*
>
> *        at
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getDelegationToken(NameNodeRpcServer.java:522)*
>
> *        at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:977)*
>
> *        at
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)*
>
> *        at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)*
>
> *        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962)*
>
> *        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2039)*
>
> *        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2035)*
>
> *        at java.security.AccessController.doPrivileged(Native Method)*
>
> *        at javax.security.auth.Subject.doAs(Subject.java:415)*
>
> *        at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)*
>
> *        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2033)*
>
>
>
>
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:162)
>
>
> org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:469)
>
> org.apache.flink.runtime.jobmanager.JobManager.org
> $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:534)
>
>
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
>
>
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>
>
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>
>
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>
>
> org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100)
>
> scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
>
>
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
>
>
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
>
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>
>
> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
>
> akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>
>
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
>
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>
> akka.actor.ActorCell.invoke(ActorCell.scala:487)
>
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>
> akka.dispatch.Mailbox.run(Mailbox.scala:221)
>
> akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>
>
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
>
> Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException):
> Delegation Token can be issued only with kerberos or web authentication
>
>         at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDelegationToken(FSNamesystem.java:7609)
>
>         at
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getDelegationToken(NameNodeRpcServer.java:522)
>
>         at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:977)
>
>         at
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>
>         at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)
>
>         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962)
>
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2039)
>
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2035)
>
>         at java.security.AccessController.doPrivileged(Native Method)
>
>         at javax.security.auth.Subject.doAs(Subject.java:415)
>
>         at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
>
>         at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2033)
>
>
>
> org.apache.hadoop.ipc.Client.call(Client.java:1468)
>
> org.apache.hadoop.ipc.Client.call(Client.java:1399)
>
>
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
>
> com.sun.proxy.$Proxy14.getDelegationToken(Unknown Source)
>
>
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getDelegationToken(ClientNamenodeProtocolTranslatorPB.java:909)
>
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> java.lang.reflect.Method.invoke(Method.java:606)
>
>
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
>
>
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>
> com.sun.proxy.$Proxy15.getDelegationToken(Unknown Source)
>
> org.apache.hadoop.hdfs.DFSClient.getDelegationToken(DFSClient.java:1029)
>
>
> org.apache.hadoop.hdfs.DistributedFileSystem.getDelegationToken(DistributedFileSystem.java:1355)
>
>
> org.apache.hadoop.fs.FileSystem.collectDelegationTokens(FileSystem.java:529)
>
> org.apache.hadoop.fs.FileSystem.addDelegationTokens(FileSystem.java:507)
>
>
> org.apache.hadoop.hdfs.DistributedFileSystem.addDelegationTokens(DistributedFileSystem.java:2041)
>
>
> org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:121)
>
>
> org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:100)
>
>
> org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodes(TokenCache.java:80)
>
>
> org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:205)
>
>
> org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
>
>
> org.apache.hive.hcatalog.mapreduce.HCatBaseInputFormat.getSplits(HCatBaseInputFormat.java:157)
>
>
> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:140)
>
>
> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:51)
>
>
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:146)
>
>
> org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:469)
>
> org.apache.flink.runtime.jobmanager.JobManager.org
> $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:534)
>
>
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
>
>
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>
>
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>
>
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>
>
> org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100)
>
> scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
>
>
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
>
>
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
>
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>
>
> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
>
> akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>
>
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
>
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>
> akka.actor.ActorCell.invoke(ActorCell.scala:487)
>
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>
> akka.dispatch.Mailbox.run(Mailbox.scala:221)
>
> akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>
>
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
>
>
>
>
>
> Do you have any clue?
>
>
>
> Best regards,
>
> Arnaud
>
>
>
>
>
>
> ------------------------------
>
>
> L'intégrité de ce message n'étant pas assurée sur internet, la société
> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
> vous n'êtes pas destinataire de ce message, merci de le détruire et
> d'avertir l'expéditeur.
>
> The integrity of this message cannot be guaranteed on the Internet. The
> company that sent this message cannot therefore be held liable for its
> content nor attachments. Any unauthorized use or dissemination is
> prohibited. If you are not the intended recipient of this message, then
> please delete it and notify the sender.
>
>
>

RE: Using HadoopInputFormat files from Flink/Yarn in a secure cluster gives an error

Posted by "LINZ, Arnaud" <AL...@bouyguestelecom.fr>.
Hi Robert,

Yes, it’s an internal tool, which get an HDFS FileSystem instance. I do some Kerberos-related operations, needed because I manipulate some HDFS files before executing the application.
The local cluster mode is working fine with the same code, and it does some HCat reading / HDFS writing.

What HdfsTools does, in a nutshell :
  final Configuration cfg = new Configuration();
        cfg.addResource(new Path("/home/hadoop/conf/core-site.xml"));
        cfg.addResource(new Path("/home/hadoop/conf/hdfs-site.xml"));
        cfg.addResource(new Path(Environnement.getEnvVarQuietly("HADOOP_CONF_DIR") + "/core-site.xml"));
        cfg.addResource(new Path(Environnement.getEnvVarQuietly("HADOOP_CONF_DIR") + "/hdfs-site.xml"));
        // Kerberos handling
        if (isKerberosActive()) {
            loginKerberos(cfg);
        }
        filesys = FileSystem.get(cfg);

And the straightforward kerberos stuff:
public static synchronized void loginKerberos(Configuration cfg) {
        UserGroupInformation.setConfiguration(cfg);
        if (!loggedIn) {
            try {
                UserGroupInformation.loginUserFromKeytab(getKerberosPrincipal(), getKerberosKeytab());
                loggedIn = true;
                JournalUDF.logLocalFS("User " + UserGroupInformation.getLoginUser() + " : Kerberos login succeeded ");
            }
            catch (IOException excep) {
                throw new GaneshRuntimeException("Unable to log (kerberos) : " + excep.toString(), excep);
            }
        }
    }
loggedIn being static to the class, and alinz having all the proper rights.

From what I’ve seen on google, spark and hive/oozie ran into the same error and somewhat corrected that, but I don’t know if it will help to see if it’s really the same pb.
I’m sending you the full trace on a private mail.

Arnaud

De : Robert Metzger [mailto:rmetzger@apache.org]
Envoyé : jeudi 20 août 2015 16:42
À : user@flink.apache.org
Objet : Re: Using HadoopInputFormat files from Flink/Yarn in a secure cluster gives an error

Hi Arnaud,

I suspect the "HdfsTools" are something internal from your company?
Are they doing any kerberos-related operations?

Is the local cluster mode also reading files from the secured HDFS cluster?

Flink is taking care of sending the authentication tokens from the client to the jobManager and to the TaskManagers.
For HDFS Flink should also use these user settings.
I don't know whether the HCatalog code / Hadoop compatbililty code is also doing some kerberos operations which are interfering with our efforts.

From the logs, you can see:
Secure Hadoop environment setup detected. Running in secure context.
15:04:18,005 INFO  org.apache.hadoop.security.UserGroupInformation               - Login successful for user alinz using keytab file /usr/users/alinz/alinz.keytab

Is the user "alinz" authorized to access the files in HDFS?

I have to admit that I didn't see this issue before.
If possible, can you privately send the the full log of the application, using "yarn logs -applicationId <ID>" ?


On Thu, Aug 20, 2015 at 4:08 PM, LINZ, Arnaud <AL...@bouyguestelecom.fr>> wrote:
Hello,

My application handles as input and output some HDFS files in the jobs and in the driver application.
It works in local cluster mode, but when I’m trying to submit it to a yarn client, when I try to use a HadoopInputFormat (that comes from a HCatalog request), I have the following error: Delegation Token can be issued only with kerberos or web authentication (full stack trace below).

Code which I believe causes the error (It’s not clear in the stack trace, as the nearest point in my code is “execEnv.execute()”) :

public synchronized DataSet<T> readTable(String dbName, String tableName, String filter, ExecutionEnvironment cluster,
        final HiveBeanFactory<T> factory) throws IOException {

        // login kerberos if needed (via UserGroupInformation.loginUserFromKeytab(getKerberosPrincipal(), getKerberosKeytab());)
        HdfsTools.getFileSystem();

        // Create M/R job and configure it
        final Job job = Job.getInstance();
        job.setJobName("Flink source for Hive Table " + dbName + "." + tableName);

        // Crée la source
        @SuppressWarnings({ "unchecked", "rawtypes" })
        final HadoopInputFormat<NullWritable, DefaultHCatRecord> inputFormat = new HadoopInputFormat<NullWritable, // CHECKSTYLE:OFF
        DefaultHCatRecord>(// CHECKSTYLE:ON
            (InputFormat) HCatInputFormat.setInput(job, dbName, tableName, filter), //
            NullWritable.class, //
            DefaultHCatRecord.class, //
            job);

        final HCatSchema inputSchema = HCatInputFormat.getTableSchema(job.getConfiguration());
        @SuppressWarnings("serial")
        final DataSet<T> dataSet = cluster
        // Read the table
            .createInput(inputFormat)
            // map bean (key is useless)
            .flatMap(new FlatMapFunction<Tuple2<NullWritable, DefaultHCatRecord>, T>() {
                @Override
                public void flatMap(Tuple2<NullWritable, DefaultHCatRecord> value, Collector<T> out) throws Exception {  // NOPMD
                    final T record = factory.fromHive(value.f1, inputSchema);
                    if (record != null) {
                        out.collect(record);
                    }
                }
            }).returns(beanClass);

        return dataSet;
    }

Maybe I need to explicitely get a token on each node in the initialization of HadoopInputFormat() (overriding configure()) ? That would be difficult since the keyfile is on the driver’s local drive…

StackTrace :

Found YARN properties file /usr/lib/flink/bin/../conf/.yarn-properties
Using JobManager address from YARN properties bt1svlmw.bpa.bouyguestelecom.fr/172.19.115.52:50494<http://bt1svlmw.bpa.bouyguestelecom.fr/172.19.115.52:50494>
Secure Hadoop environment setup detected. Running in secure context.
2015:08:20 15:04:17 (main) - INFO - com.bouygtel.kuberasdk.main.Application.mainMethod - Dï¿œbut traitement
15:04:18,005 INFO  org.apache.hadoop.security.UserGroupInformation               - Login successful for user alinz using keytab file /usr/users/alinz/alinz.keytab
15:04:20,139 WARN  org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory       - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
Error : Execution Kubera KO : java.lang.IllegalStateException: Error while executing Flink application
com.bouygtel.kuberasdk.main.ApplicationBatch.execCluster(ApplicationBatch.java:84)
com.bouygtel.kubera.main.segment.ApplicationGeoSegment.batchExec(ApplicationGeoSegment.java:68)
com.bouygtel.kuberasdk.main.ApplicationBatch.exec(ApplicationBatch.java:51)
com.bouygtel.kuberasdk.main.Application.mainMethod(Application.java:81)
com.bouygtel.kubera.main.segment.MainGeoSmooth.main(MainGeoSmooth.java:44)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:606)
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
org.apache.flink.client.program.Client.run(Client.java:315)
org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:873)
org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:870)
org.apache.flink.runtime.security.SecurityUtils$1.run(SecurityUtils.java:50)
java.security.AccessController.doPrivileged(Native Method)
javax.security.auth.Subject.doAs(Subject.java:415)
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
org.apache.flink.runtime.security.SecurityUtils.runSecured(SecurityUtils.java:47)
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:870)
org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)

Caused by: org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Failed to submit job dddaf104260eb0f56ff336727ceeb49e (KUBERA-GEO-BRUT2SEGMENT)
org.apache.flink.client.program.Client.run(Client.java:413)
org.apache.flink.client.program.Client.run(Client.java:356)
org.apache.flink.client.program.Client.run(Client.java:349)
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
com.bouygtel.kuberasdk.main.ApplicationBatch.execCluster(ApplicationBatch.java:80)
com.bouygtel.kubera.main.segment.ApplicationGeoSegment.batchExec(ApplicationGeoSegment.java:68)
com.bouygtel.kuberasdk.main.ApplicationBatch.exec(ApplicationBatch.java:51)
com.bouygtel.kuberasdk.main.Application.mainMethod(Application.java:81)
com.bouygtel.kubera.main.segment.MainGeoSmooth.main(MainGeoSmooth.java:44)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:606)
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
org.apache.flink.client.program.Client.run(Client.java:315)
org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:873)
org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:870)
org.apache.flink.runtime.security.SecurityUtils$1.run(SecurityUtils.java:50)
java.security.AccessController.doPrivileged(Native Method)
javax.security.auth.Subject.doAs(Subject.java:415)
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
org.apache.flink.runtime.security.SecurityUtils.runSecured(SecurityUtils.java:47)
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:870)
org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)

Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to submit job dddaf104260eb0f56ff336727ceeb49e (KUBERA-GEO-BRUT2SEGMENT)
org.apache.flink.runtime.jobmanager.JobManager.org<http://org.apache.flink.runtime.jobmanager.JobManager.org>$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:594)
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100)
scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
akka.actor.Actor$class.aroundReceive(Actor.scala:465)
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
akka.actor.ActorCell.invoke(ActorCell.scala:487)
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
akka.dispatch.Mailbox.run(Mailbox.scala:221)
akka.dispatch.Mailbox.exec(Mailbox.scala:231)
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: Delegation Token can be issued only with kerberos or web authentication
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDelegationToken(FSNamesystem.java:7609)
        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getDelegationToken(NameNodeRpcServer.java:522)
        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:977)
        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2039)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2035)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2033)

org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:162)
org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:469)
org.apache.flink.runtime.jobmanager.JobManager.org<http://org.apache.flink.runtime.jobmanager.JobManager.org>$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:534)
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100)
scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
akka.actor.Actor$class.aroundReceive(Actor.scala:465)
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
akka.actor.ActorCell.invoke(ActorCell.scala:487)
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
akka.dispatch.Mailbox.run(Mailbox.scala:221)
akka.dispatch.Mailbox.exec(Mailbox.scala:231)
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException): Delegation Token can be issued only with kerberos or web authentication
        at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDelegationToken(FSNamesystem.java:7609)
        at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getDelegationToken(NameNodeRpcServer.java:522)
        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:977)
        at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)
        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2039)
        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2035)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2033)

org.apache.hadoop.ipc.Client.call(Client.java:1468)
org.apache.hadoop.ipc.Client.call(Client.java:1399)
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
com.sun.proxy.$Proxy14.getDelegationToken(Unknown Source)
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getDelegationToken(ClientNamenodeProtocolTranslatorPB.java:909)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:606)
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
com.sun.proxy.$Proxy15.getDelegationToken(Unknown Source)
org.apache.hadoop.hdfs.DFSClient.getDelegationToken(DFSClient.java:1029)
org.apache.hadoop.hdfs.DistributedFileSystem.getDelegationToken(DistributedFileSystem.java:1355)
org.apache.hadoop.fs.FileSystem.collectDelegationTokens(FileSystem.java:529)
org.apache.hadoop.fs.FileSystem.addDelegationTokens(FileSystem.java:507)
org.apache.hadoop.hdfs.DistributedFileSystem.addDelegationTokens(DistributedFileSystem.java:2041)
org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:121)
org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:100)
org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodes(TokenCache.java:80)
org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:205)
org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
org.apache.hive.hcatalog.mapreduce.HCatBaseInputFormat.getSplits(HCatBaseInputFormat.java:157)
org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:140)
org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:51)
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:146)
org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:469)
org.apache.flink.runtime.jobmanager.JobManager.org<http://org.apache.flink.runtime.jobmanager.JobManager.org>$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:534)
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100)
scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
akka.actor.Actor$class.aroundReceive(Actor.scala:465)
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
akka.actor.ActorCell.invoke(ActorCell.scala:487)
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
akka.dispatch.Mailbox.run(Mailbox.scala:221)
akka.dispatch.Mailbox.exec(Mailbox.scala:231)
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



Do you have any clue?

Best regards,
Arnaud



________________________________

L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.


Re: Using HadoopInputFormat files from Flink/Yarn in a secure cluster gives an error

Posted by Robert Metzger <rm...@apache.org>.
Hi Arnaud,

I suspect the "HdfsTools" are something internal from your company?
Are they doing any kerberos-related operations?

Is the local cluster mode also reading files from the secured HDFS cluster?

Flink is taking care of sending the authentication tokens from the client
to the jobManager and to the TaskManagers.
For HDFS Flink should also use these user settings.
I don't know whether the HCatalog code / Hadoop compatbililty code is also
doing some kerberos operations which are interfering with our efforts.

>From the logs, you can see:

> Secure Hadoop environment setup detected. Running in secure context.
> 15:04:18,005 INFO
> org.apache.hadoop.security.UserGroupInformation               - Login
> successful for user alinz using keytab file /usr/users/alinz/alinz.keytab


Is the user "alinz" authorized to access the files in HDFS?


I have to admit that I didn't see this issue before.

If possible, can you privately send the the full log of the application,
using "yarn logs -applicationId <ID>" ?



On Thu, Aug 20, 2015 at 4:08 PM, LINZ, Arnaud <AL...@bouyguestelecom.fr>
wrote:

> Hello,
>
>
>
> My application handles as input and output some HDFS files in the jobs and
> in the driver application.
>
> It works in local cluster mode, but when I’m trying to submit it to a yarn
> client, when I try to use a HadoopInputFormat (that comes from a HCatalog
> request), I have the following error: *Delegation Token can be issued
> only with kerberos or web authentication *(full stack trace below).
>
>
>
> Code which I believe causes the error (It’s not clear in the stack trace,
> as the nearest point in my code is “execEnv.execute()”) :
>
>
>
> *public* *synchronized* DataSet<T> readTable(String dbName, String
> tableName, String filter, ExecutionEnvironment cluster,
>
>         *final* HiveBeanFactory<T> factory) *throws* IOException {
>
>
>
>         // login kerberos if needed (via
> UserGroupInformation.loginUserFromKeytab(getKerberosPrincipal(),
> getKerberosKeytab());)
>
>         HdfsTools.*getFileSystem*();
>
>
>
>         // Create M/R job and configure it
>
>         *final* Job job = Job.*getInstance*();
>
>         job.setJobName("Flink source for Hive Table " + dbName + "." +
> tableName);
>
>
>
>         // Crée la source
>
>         @SuppressWarnings({ "unchecked", "rawtypes" })
>
>         *final* HadoopInputFormat<NullWritable, DefaultHCatRecord>
> inputFormat = *new* HadoopInputFormat<NullWritable, // CHECKSTYLE:OFF
>
>         DefaultHCatRecord>(// CHECKSTYLE:ON
>
>             (InputFormat) HCatInputFormat.*setInput*(job, dbName,
> tableName, filter), //
>
>             NullWritable.*class*, //
>
>             DefaultHCatRecord.*class*, //
>
>             job);
>
>
>
>         *final* HCatSchema inputSchema = HCatInputFormat.*getTableSchema*(
> job.getConfiguration());
>
>         @SuppressWarnings("serial")
>
>         *final* DataSet<T> dataSet = cluster
>
>         // Read the table
>
>             .createInput(inputFormat)
>
>             // map bean (key is useless)
>
>             .flatMap(*new* FlatMapFunction<Tuple2<NullWritable,
> DefaultHCatRecord>, T>() {
>
>                 @Override
>
>                 *public* *void* flatMap(Tuple2<NullWritable,
> DefaultHCatRecord> value, Collector<T> out) *throws* Exception {  // NOPMD
>
>                     *final* T record = factory.fromHive(value.f1,
> inputSchema);
>
>                     *if* (record != *null*) {
>
>                         out.collect(record);
>
>                     }
>
>                 }
>
>             }).returns(beanClass);
>
>
>
>         *return* dataSet;
>
>     }
>
>
>
> Maybe I need to explicitely get a token on each node in the
> initialization of HadoopInputFormat() (overriding configure()) ? That
> would be difficult since the keyfile is on the driver’s local drive…
>
>
>
> StackTrace :
>
>
>
> Found YARN properties file /usr/lib/flink/bin/../conf/.yarn-properties
>
> Using JobManager address from YARN properties
> bt1svlmw.bpa.bouyguestelecom.fr/172.19.115.52:50494
>
> Secure Hadoop environment setup detected. Running in secure context.
>
> 2015:08:20 15:04:17 (main) - INFO -
> com.bouygtel.kuberasdk.main.Application.mainMethod - Dï¿œbut traitement
>
> 15:04:18,005 INFO
> org.apache.hadoop.security.UserGroupInformation               - Login
> successful for user alinz using keytab file /usr/users/alinz/alinz.keytab
>
> 15:04:20,139 WARN
> org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory       - The
> short-circuit local reads feature cannot be used because libhadoop cannot
> be loaded.
>
> Error : Execution Kubera KO : java.lang.IllegalStateException: Error while
> executing Flink application
>
>
> com.bouygtel.kuberasdk.main.ApplicationBatch.execCluster(ApplicationBatch.java:84)
>
>
> com.bouygtel.kubera.main.segment.ApplicationGeoSegment.batchExec(ApplicationGeoSegment.java:68)
>
> com.bouygtel.kuberasdk.main.ApplicationBatch.exec(ApplicationBatch.java:51)
>
> com.bouygtel.kuberasdk.main.Application.mainMethod(Application.java:81)
>
> com.bouygtel.kubera.main.segment.MainGeoSmooth.main(MainGeoSmooth.java:44)
>
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> java.lang.reflect.Method.invoke(Method.java:606)
>
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
>
>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
>
> org.apache.flink.client.program.Client.run(Client.java:315)
>
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
>
> org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
>
> org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:873)
>
> org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:870)
>
>
> org.apache.flink.runtime.security.SecurityUtils$1.run(SecurityUtils.java:50)
>
> java.security.AccessController.doPrivileged(Native Method)
>
> javax.security.auth.Subject.doAs(Subject.java:415)
>
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
>
>
> org.apache.flink.runtime.security.SecurityUtils.runSecured(SecurityUtils.java:47)
>
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:870)
>
> org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
>
>
>
> Caused by: org.apache.flink.client.program.ProgramInvocationException: The
> program execution failed: Failed to submit job
> dddaf104260eb0f56ff336727ceeb49e (KUBERA-GEO-BRUT2SEGMENT)
>
> org.apache.flink.client.program.Client.run(Client.java:413)
>
> org.apache.flink.client.program.Client.run(Client.java:356)
>
> org.apache.flink.client.program.Client.run(Client.java:349)
>
>
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
>
>
> com.bouygtel.kuberasdk.main.ApplicationBatch.execCluster(ApplicationBatch.java:80)
>
>
> com.bouygtel.kubera.main.segment.ApplicationGeoSegment.batchExec(ApplicationGeoSegment.java:68)
>
> com.bouygtel.kuberasdk.main.ApplicationBatch.exec(ApplicationBatch.java:51)
>
> com.bouygtel.kuberasdk.main.Application.mainMethod(Application.java:81)
>
> com.bouygtel.kubera.main.segment.MainGeoSmooth.main(MainGeoSmooth.java:44)
>
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> java.lang.reflect.Method.invoke(Method.java:606)
>
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
>
>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
>
> org.apache.flink.client.program.Client.run(Client.java:315)
>
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
>
> org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
>
> org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:873)
>
> org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:870)
>
>
> org.apache.flink.runtime.security.SecurityUtils$1.run(SecurityUtils.java:50)
>
> java.security.AccessController.doPrivileged(Native Method)
>
> javax.security.auth.Subject.doAs(Subject.java:415)
>
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
>
>
> org.apache.flink.runtime.security.SecurityUtils.runSecured(SecurityUtils.java:47)
>
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:870)
>
> org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
>
>
>
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed
> to submit job dddaf104260eb0f56ff336727ceeb49e (KUBERA-GEO-BRUT2SEGMENT)
>
> org.apache.flink.runtime.jobmanager.JobManager.org
> $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:594)
>
>
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
>
>
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>
>
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>
>
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>
>
> org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100)
>
> scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
>
>
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
>
>
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
>
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>
>
> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
>
> akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>
>
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
>
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>
> akka.actor.ActorCell.invoke(ActorCell.scala:487)
>
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>
> akka.dispatch.Mailbox.run(Mailbox.scala:221)
>
> akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>
>
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
>
> *Caused by: org.apache.flink.runtime.JobException: Creating the input
> splits caused an error: Delegation Token can be issued only with kerberos
> or web authentication*
>
> *        at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDelegationToken(FSNamesystem.java:7609)*
>
> *        at
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getDelegationToken(NameNodeRpcServer.java:522)*
>
> *        at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:977)*
>
> *        at
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)*
>
> *        at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)*
>
> *        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962)*
>
> *        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2039)*
>
> *        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2035)*
>
> *        at java.security.AccessController.doPrivileged(Native Method)*
>
> *        at javax.security.auth.Subject.doAs(Subject.java:415)*
>
> *        at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)*
>
> *        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2033)*
>
>
>
>
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:162)
>
>
> org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:469)
>
> org.apache.flink.runtime.jobmanager.JobManager.org
> $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:534)
>
>
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
>
>
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>
>
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>
>
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>
>
> org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100)
>
> scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
>
>
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
>
>
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
>
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>
>
> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
>
> akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>
>
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
>
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>
> akka.actor.ActorCell.invoke(ActorCell.scala:487)
>
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>
> akka.dispatch.Mailbox.run(Mailbox.scala:221)
>
> akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>
>
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
>
> Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException):
> Delegation Token can be issued only with kerberos or web authentication
>
>         at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDelegationToken(FSNamesystem.java:7609)
>
>         at
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getDelegationToken(NameNodeRpcServer.java:522)
>
>         at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:977)
>
>         at
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>
>         at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)
>
>         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962)
>
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2039)
>
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2035)
>
>         at java.security.AccessController.doPrivileged(Native Method)
>
>         at javax.security.auth.Subject.doAs(Subject.java:415)
>
>         at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
>
>         at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2033)
>
>
>
> org.apache.hadoop.ipc.Client.call(Client.java:1468)
>
> org.apache.hadoop.ipc.Client.call(Client.java:1399)
>
>
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
>
> com.sun.proxy.$Proxy14.getDelegationToken(Unknown Source)
>
>
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getDelegationToken(ClientNamenodeProtocolTranslatorPB.java:909)
>
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> java.lang.reflect.Method.invoke(Method.java:606)
>
>
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
>
>
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>
> com.sun.proxy.$Proxy15.getDelegationToken(Unknown Source)
>
> org.apache.hadoop.hdfs.DFSClient.getDelegationToken(DFSClient.java:1029)
>
>
> org.apache.hadoop.hdfs.DistributedFileSystem.getDelegationToken(DistributedFileSystem.java:1355)
>
>
> org.apache.hadoop.fs.FileSystem.collectDelegationTokens(FileSystem.java:529)
>
> org.apache.hadoop.fs.FileSystem.addDelegationTokens(FileSystem.java:507)
>
>
> org.apache.hadoop.hdfs.DistributedFileSystem.addDelegationTokens(DistributedFileSystem.java:2041)
>
>
> org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:121)
>
>
> org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:100)
>
>
> org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodes(TokenCache.java:80)
>
>
> org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:205)
>
>
> org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
>
>
> org.apache.hive.hcatalog.mapreduce.HCatBaseInputFormat.getSplits(HCatBaseInputFormat.java:157)
>
>
> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:140)
>
>
> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:51)
>
>
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:146)
>
>
> org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:469)
>
> org.apache.flink.runtime.jobmanager.JobManager.org
> $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:534)
>
>
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
>
>
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>
>
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>
>
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>
>
> org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100)
>
> scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
>
>
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
>
>
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
>
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>
>
> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
>
> akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>
>
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
>
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>
> akka.actor.ActorCell.invoke(ActorCell.scala:487)
>
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>
> akka.dispatch.Mailbox.run(Mailbox.scala:221)
>
> akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>
>
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
>
>
>
>
>
> Do you have any clue?
>
>
>
> Best regards,
>
> Arnaud
>
>
>
>
>
> ------------------------------
>
> L'intégrité de ce message n'étant pas assurée sur internet, la société
> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
> vous n'êtes pas destinataire de ce message, merci de le détruire et
> d'avertir l'expéditeur.
>
> The integrity of this message cannot be guaranteed on the Internet. The
> company that sent this message cannot therefore be held liable for its
> content nor attachments. Any unauthorized use or dissemination is
> prohibited. If you are not the intended recipient of this message, then
> please delete it and notify the sender.
>