You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2015/08/21 12:39:45 UTC

[jira] [Commented] (FLINK-2555) Hadoop Input/Output Formats are unable to access secured HDFS clusters

    [ https://issues.apache.org/jira/browse/FLINK-2555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14706528#comment-14706528 ] 

ASF GitHub Bot commented on FLINK-2555:
---------------------------------------

GitHub user rmetzger opened a pull request:

    https://github.com/apache/flink/pull/1038

    [FLINK-2555] Properly pass security credentials in the Hadoop Input/Output format wrappers

    This is needed because the Hadoop IF/OF's are using Hadoop's FileSystem stack, which is using the security credentials passed in the JobConf / Job class in the getSplits() method.
    
    Note that access to secured Hadoop 1.x using Hadoop IF/OF's is not possible with this change. This limitation is due to missing methods in the old APIs.
    
    I've also updated the version of the "de.javakaffee.kryo-serializers" from 0.27 to 0.36 because a user on the ML recently needed a specific Kryo serializer which was not available in the old dependency.
    
    For the Java and Scala API, I renamed the first argument's name: `readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, String inputPath, Job job)`
    
    This makes it easier in IDE completions to distinguish between the mapreduce and the mapred variant. (before the argument was always called `mapredInputFormat` now, we have the `mapreduceInputFormat` variant where applicable)

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/rmetzger/flink flink2555

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1038.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1038
    
----
commit bac21bf5d77c8e15c608ecbf006d29e7af1dd68a
Author: Aljoscha Krettek <al...@gmail.com>
Date:   2015-07-23T13:12:38Z

    [FLINK-2398][api-breaking] Introduce StreamGraphGenerator
    
    This decouples the building of the StreamGraph from the API methods.
    Before the methods would build the StreamGraph as they go. Now the API
    methods build a hierachy of StreamTransformation nodes. From these a
    StreamGraph is generated upon execution.
    
    This also introduces some API breaking changes:
    
     - The result of methods that create sinks is now DataStreamSink instead
       of DataStream
     - Iterations cannot have feedback edges with differing parallelism
     - "Preserve partitioning" is not the default for feedback edges. The
       previous option for this is removed.
     - You can close an iteration several times, no need for a union.
     - Strict checking of whether partitioning and parallelism work
       together. I.e. if upstream and downstream parallelism don't match it
       is not legal to have Forward partitioning anymore. This was not very
       transparent: When you went from low parallelism to high dop some
       downstream  operators would never get any input. When you went from high
       parallelism to low dop you would get skew in the downstream operators
       because all elements that would be forwarded to an operator that is not
       "there" go to another operator. This requires insertion of global()
       or rebalance() in some places. For example with most sources which
       have parallelism one.
    
    This also makes StreamExecutionEnvironment.execute() behave consistently
    across different execution environments (local, remote ...): The list of
    operators to be executed are cleared after execute is called.

commit e4b72e6d0148d071a97d2dab5c3bd97b81ee97a5
Author: Robert Metzger <rm...@apache.org>
Date:   2015-08-20T16:43:04Z

    [FLINK-2555] Properly pass security credentials in the Hadoop Input/Output format wrappers
    
    This is needed because the Hadoop IF/OF's are using Hadoop's FileSystem stack, which is using
    the security credentials passed in the JobConf / Job class in the getSplits() method.
    
    Note that access to secured Hadoop 1.x using Hadoop IF/OF's is not possible with this change.
    This limitation is due to missing methods in the old APIs.

----


> Hadoop Input/Output Formats are unable to access secured HDFS clusters
> ----------------------------------------------------------------------
>
>                 Key: FLINK-2555
>                 URL: https://issues.apache.org/jira/browse/FLINK-2555
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 0.9, 0.10
>            Reporter: Robert Metzger
>            Assignee: Robert Metzger
>            Priority: Critical
>
> It seems that authentication tokens are not passed correctly to the input format when accessing secured HDFS clusters.
> Exception
> {code}
> org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Failed to submit job b319a28f62855917901cfb67c5457142 (Flink Java Job at Thu Aug 20 10:46:41 PDT 2015)
> 	at org.apache.flink.client.program.Client.run(Client.java:413)
> 	at org.apache.flink.client.program.Client.run(Client.java:356)
> 	at org.apache.flink.client.program.Client.run(Client.java:349)
> 	at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
> 	at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
> 	at org.apache.flink.api.java.DataSet.collect(DataSet.java:408)
> 	at org.apache.flink.api.java.DataSet.print(DataSet.java:1346)
> 	at de.robertmetzger.WordCount.main(WordCount.java:73)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:606)
> 	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
> 	at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
> 	at org.apache.flink.client.program.Client.run(Client.java:315)
> 	at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584)
> 	at org.apache.flink.client.CliFrontend.run(CliFrontend.java:290)
> 	at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:873)
> 	at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:870)
> 	at org.apache.flink.runtime.security.SecurityUtils$1.run(SecurityUtils.java:50)
> 	at java.security.AccessController.doPrivileged(Native Method)
> 	at javax.security.auth.Subject.doAs(Subject.java:415)
> 	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
> 	at org.apache.flink.runtime.security.SecurityUtils.runSecured(SecurityUtils.java:47)
> 	at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:870)
> 	at org.apache.flink.client.CliFrontend.main(CliFrontend.java:922)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to submit job b319a28f62855917901cfb67c5457142 (Flink Java Job at Thu Aug 20 10:46:41 PDT 2015)
> 	at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:594)
> 	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190)
> 	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> 	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> 	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> 	at org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100)
> 	at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
> 	at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
> 	at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
> 	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> 	at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
> 	at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> 	at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
> 	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> 	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> 	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> 	at akka.dispatch.Mailbox.run(Mailbox.scala:221)
> 	at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> 	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> 	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> 	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: Delegation Token can be issued only with kerberos or web authentication
> 	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDelegationToken(FSNamesystem.java:7086)
> 	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getDelegationToken(NameNodeRpcServer.java:506)
> 	at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getDelegationToken(AuthorizationProviderProxyClientProtocol.java:637)
> 	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:957)
> 	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
> 	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:587)
> 	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1026)
> 	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2013)
> 	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2009)
> 	at java.security.AccessController.doPrivileged(Native Method)
> 	at javax.security.auth.Subject.doAs(Subject.java:415)
> 	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1642)
> 	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2007)
> 	at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:162)
> 	at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:469)
> 	at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:534)
> 	... 21 more
> Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException): Delegation Token can be issued only with kerberos or web authentication
> 	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDelegationToken(FSNamesystem.java:7086)
> 	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getDelegationToken(NameNodeRpcServer.java:506)
> 	at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getDelegationToken(AuthorizationProviderProxyClientProtocol.java:637)
> 	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:957)
> 	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
> 	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:587)
> 	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1026)
> 	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2013)
> 	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2009)
> 	at java.security.AccessController.doPrivileged(Native Method)
> 	at javax.security.auth.Subject.doAs(Subject.java:415)
> 	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1642)
> 	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2007)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1410)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1363)
> 	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
> 	at com.sun.proxy.$Proxy14.getDelegationToken(Unknown Source)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:606)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:190)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:103)
> 	at com.sun.proxy.$Proxy14.getDelegationToken(Unknown Source)
> 	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getDelegationToken(ClientNamenodeProtocolTranslatorPB.java:854)
> 	at org.apache.hadoop.hdfs.DFSClient.getDelegationToken(DFSClient.java:924)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem.getDelegationToken(DistributedFileSystem.java:1336)
> 	at org.apache.hadoop.fs.FileSystem.collectDelegationTokens(FileSystem.java:527)
> 	at org.apache.hadoop.fs.FileSystem.addDelegationTokens(FileSystem.java:505)
> 	at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:121)
> 	at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:100)
> 	at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodes(TokenCache.java:80)
> 	at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:241)
> 	at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:375)
> 	at org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:140)
> 	at org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:51)
> 	at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:146)
> 	... 23 more
> The exception above occurred while trying to run your command.
> {code}
> This issue has been reported by a user: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Using-HadoopInputFormat-files-from-Flink-Yarn-in-a-secure-cluster-gives-an-error-td2472.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)