You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Trevor Kramer <tr...@gmail.com> on 2021/06/21 13:51:52 UTC
spark-submit and the portable runner
Does anyone have an example of using spark-submit to run a beam job using
the portable runner? The documentation indicates this is possible but
doesn't give an example of how to do it.
I am using the following pipeline options to generate the jar
options = PipelineOptions(['--runner=SparkRunner',
'--environment_type=DOCKER',
'--environment_config=path-to-image:latest',
'--output_executable_path=output.jar'
])
and am trying to run it with
spark-submit --master yarn --deploy-mode cluster --class
org.apache.beam.runners.spark.SparkPipelineRunner output.jar
but I get the following error
Exception in thread "main" java.lang.IllegalAccessError: class
org.apache.hadoop.hdfs.web.HftpFileSystem cannot access its superinterface
org.apache.hadoop.hdfs.web.TokenAspect$TokenManagementDelegator
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:757)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:370)
at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
at org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:3268)
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3313)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3352)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:123)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3403)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3371)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:482)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:230)
at
org.apache.spark.deploy.yarn.Client.$anonfun$appStagingBaseDir$2(Client.scala:138)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.deploy.yarn.Client.<init>(Client.scala:138)
at
org.apache.spark.deploy.yarn.YarnClusterApplication.start(Client.scala:1526)
at org.apache.spark.deploy.SparkSubmit.org
$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:853)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:928)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:937)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Re: spark-submit and the portable runner
Posted by Kyle Weaver <kc...@google.com>.
Beam's default Hadoop version is 2.10.1 [0]. It looks like you can build
the Beam Spark "classic" (non-portable) runner with Hadoop 3.2.1 [1].
However it doesn't look like we publish separate jars for different Hadoop
versions. Nor does the portable runner (job server) make the Hadoop version
readily configurable. I filed a JIRA for this [2]. The fix may be as simple
as changing the Spark runner configuration to "hadoopVersion321" [3] but I
haven't gotten a chance to verify.
Then once you build a job server with the correct Hadoop version, you can
use it by setting the pipeline option --spark_job_server_jar.
[0]
[1]
https://github.com/apache/beam/blob/cae01f09c72ee8dce1df5fca683c89534720ea9a/runners/spark/spark_runner.gradle#L53
[2] https://issues.apache.org/jira/browse/BEAM-12549
[3]
https://github.com/apache/beam/blob/cae01f09c72ee8dce1df5fca683c89534720ea9a/runners/spark/job-server/spark_job_server.gradle#L55
On Thu, Jun 24, 2021 at 6:43 AM Trevor Kramer <tr...@gmail.com>
wrote:
> Thanks. That was the issue. The latest release of Beam indicates it
> supports Spark 3 and I find references in the code to Hadoop 3.2.1. Is it
> possible to configure beam to run on Hadoop 3.2.1?
>
> Trevor
>
> On Mon, Jun 21, 2021 at 6:19 PM Kyle Weaver <kc...@google.com> wrote:
>
>> Looks like a version mismatch between Hadoop dependencies [1]. Beam's
>> Python wrapper for Spark is currently pinned to Spark 2.4.8 [2]. Which
>> Spark and Hadoop versions are your cluster using?
>>
>> [1]
>> https://stackoverflow.com/questions/62880009/error-through-remote-spark-job-java-lang-illegalaccesserror-class-org-apache-h
>> [2] https://issues.apache.org/jira/browse/BEAM-12094
>>
>> On Mon, Jun 21, 2021 at 9:52 AM Trevor Kramer <tr...@gmail.com>
>> wrote:
>>
>>> Does anyone have an example of using spark-submit to run a beam job
>>> using the portable runner? The documentation indicates this is possible but
>>> doesn't give an example of how to do it.
>>>
>>> I am using the following pipeline options to generate the jar
>>>
>>> options = PipelineOptions(['--runner=SparkRunner',
>>> '--environment_type=DOCKER',
>>> '--environment_config=path-to-image:latest',
>>> '--output_executable_path=output.jar'
>>> ])
>>>
>>> and am trying to run it with
>>>
>>> spark-submit --master yarn --deploy-mode cluster --class
>>> org.apache.beam.runners.spark.SparkPipelineRunner output.jar
>>>
>>>
>>> but I get the following error
>>>
>>>
>>> Exception in thread "main" java.lang.IllegalAccessError: class
>>> org.apache.hadoop.hdfs.web.HftpFileSystem cannot access its superinterface
>>> org.apache.hadoop.hdfs.web.TokenAspect$TokenManagementDelegator
>>>
>>> at java.lang.ClassLoader.defineClass1(Native Method)
>>>
>>> at java.lang.ClassLoader.defineClass(ClassLoader.java:757)
>>>
>>> at
>>> java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
>>>
>>> at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
>>>
>>> at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
>>>
>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
>>>
>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
>>>
>>> at java.security.AccessController.doPrivileged(Native Method)
>>>
>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
>>>
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
>>>
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
>>>
>>> at java.lang.Class.forName0(Native Method)
>>>
>>> at java.lang.Class.forName(Class.java:348)
>>>
>>> at
>>> java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:370)
>>>
>>> at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
>>>
>>> at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
>>>
>>> at org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:3268)
>>>
>>> at
>>> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3313)
>>>
>>> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3352)
>>>
>>> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:123)
>>>
>>> at
>>> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3403)
>>>
>>> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3371)
>>>
>>> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:482)
>>>
>>> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:230)
>>>
>>> at
>>> org.apache.spark.deploy.yarn.Client.$anonfun$appStagingBaseDir$2(Client.scala:138)
>>>
>>> at scala.Option.getOrElse(Option.scala:189)
>>>
>>> at org.apache.spark.deploy.yarn.Client.<init>(Client.scala:138)
>>>
>>> at
>>> org.apache.spark.deploy.yarn.YarnClusterApplication.start(Client.scala:1526)
>>>
>>> at org.apache.spark.deploy.SparkSubmit.org
>>> $apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:853)
>>>
>>> at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
>>>
>>> at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
>>>
>>> at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
>>>
>>> at
>>> org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:928)
>>>
>>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:937)
>>>
>>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>>
>>>
Re: spark-submit and the portable runner
Posted by Trevor Kramer <tr...@gmail.com>.
Thanks. That was the issue. The latest release of Beam indicates it
supports Spark 3 and I find references in the code to Hadoop 3.2.1. Is it
possible to configure beam to run on Hadoop 3.2.1?
Trevor
On Mon, Jun 21, 2021 at 6:19 PM Kyle Weaver <kc...@google.com> wrote:
> Looks like a version mismatch between Hadoop dependencies [1]. Beam's
> Python wrapper for Spark is currently pinned to Spark 2.4.8 [2]. Which
> Spark and Hadoop versions are your cluster using?
>
> [1]
> https://stackoverflow.com/questions/62880009/error-through-remote-spark-job-java-lang-illegalaccesserror-class-org-apache-h
> [2] https://issues.apache.org/jira/browse/BEAM-12094
>
> On Mon, Jun 21, 2021 at 9:52 AM Trevor Kramer <tr...@gmail.com>
> wrote:
>
>> Does anyone have an example of using spark-submit to run a beam job using
>> the portable runner? The documentation indicates this is possible but
>> doesn't give an example of how to do it.
>>
>> I am using the following pipeline options to generate the jar
>>
>> options = PipelineOptions(['--runner=SparkRunner',
>> '--environment_type=DOCKER',
>> '--environment_config=path-to-image:latest',
>> '--output_executable_path=output.jar'
>> ])
>>
>> and am trying to run it with
>>
>> spark-submit --master yarn --deploy-mode cluster --class
>> org.apache.beam.runners.spark.SparkPipelineRunner output.jar
>>
>>
>> but I get the following error
>>
>>
>> Exception in thread "main" java.lang.IllegalAccessError: class
>> org.apache.hadoop.hdfs.web.HftpFileSystem cannot access its superinterface
>> org.apache.hadoop.hdfs.web.TokenAspect$TokenManagementDelegator
>>
>> at java.lang.ClassLoader.defineClass1(Native Method)
>>
>> at java.lang.ClassLoader.defineClass(ClassLoader.java:757)
>>
>> at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
>>
>> at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
>>
>> at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
>>
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
>>
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
>>
>> at java.security.AccessController.doPrivileged(Native Method)
>>
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
>>
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
>>
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
>>
>> at java.lang.Class.forName0(Native Method)
>>
>> at java.lang.Class.forName(Class.java:348)
>>
>> at
>> java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:370)
>>
>> at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
>>
>> at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
>>
>> at org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:3268)
>>
>> at
>> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3313)
>>
>> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3352)
>>
>> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:123)
>>
>> at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3403)
>>
>> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3371)
>>
>> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:482)
>>
>> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:230)
>>
>> at
>> org.apache.spark.deploy.yarn.Client.$anonfun$appStagingBaseDir$2(Client.scala:138)
>>
>> at scala.Option.getOrElse(Option.scala:189)
>>
>> at org.apache.spark.deploy.yarn.Client.<init>(Client.scala:138)
>>
>> at
>> org.apache.spark.deploy.yarn.YarnClusterApplication.start(Client.scala:1526)
>>
>> at org.apache.spark.deploy.SparkSubmit.org
>> $apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:853)
>>
>> at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
>>
>> at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
>>
>> at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
>>
>> at
>> org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:928)
>>
>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:937)
>>
>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>
>>
Re: spark-submit and the portable runner
Posted by Kyle Weaver <kc...@google.com>.
Looks like a version mismatch between Hadoop dependencies [1]. Beam's
Python wrapper for Spark is currently pinned to Spark 2.4.8 [2]. Which
Spark and Hadoop versions are your cluster using?
[1]
https://stackoverflow.com/questions/62880009/error-through-remote-spark-job-java-lang-illegalaccesserror-class-org-apache-h
[2] https://issues.apache.org/jira/browse/BEAM-12094
On Mon, Jun 21, 2021 at 9:52 AM Trevor Kramer <tr...@gmail.com>
wrote:
> Does anyone have an example of using spark-submit to run a beam job using
> the portable runner? The documentation indicates this is possible but
> doesn't give an example of how to do it.
>
> I am using the following pipeline options to generate the jar
>
> options = PipelineOptions(['--runner=SparkRunner',
> '--environment_type=DOCKER',
> '--environment_config=path-to-image:latest',
> '--output_executable_path=output.jar'
> ])
>
> and am trying to run it with
>
> spark-submit --master yarn --deploy-mode cluster --class
> org.apache.beam.runners.spark.SparkPipelineRunner output.jar
>
>
> but I get the following error
>
>
> Exception in thread "main" java.lang.IllegalAccessError: class
> org.apache.hadoop.hdfs.web.HftpFileSystem cannot access its superinterface
> org.apache.hadoop.hdfs.web.TokenAspect$TokenManagementDelegator
>
> at java.lang.ClassLoader.defineClass1(Native Method)
>
> at java.lang.ClassLoader.defineClass(ClassLoader.java:757)
>
> at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
>
> at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
>
> at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
>
> at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
>
> at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
>
> at java.security.AccessController.doPrivileged(Native Method)
>
> at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
>
> at java.lang.Class.forName0(Native Method)
>
> at java.lang.Class.forName(Class.java:348)
>
> at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:370)
>
> at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
>
> at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
>
> at org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:3268)
>
> at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3313)
>
> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3352)
>
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:123)
>
> at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3403)
>
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3371)
>
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:482)
>
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:230)
>
> at
> org.apache.spark.deploy.yarn.Client.$anonfun$appStagingBaseDir$2(Client.scala:138)
>
> at scala.Option.getOrElse(Option.scala:189)
>
> at org.apache.spark.deploy.yarn.Client.<init>(Client.scala:138)
>
> at
> org.apache.spark.deploy.yarn.YarnClusterApplication.start(Client.scala:1526)
>
> at org.apache.spark.deploy.SparkSubmit.org
> $apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:853)
>
> at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
>
> at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
>
> at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
>
> at
> org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:928)
>
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:937)
>
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
>