You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Trevor Kramer <tr...@gmail.com> on 2021/06/21 13:51:52 UTC

spark-submit and the portable runner

Does anyone have an example of using spark-submit to run a beam job using
the portable runner? The documentation indicates this is possible but
doesn't give an example of how to do it.

I am using the following pipeline options to generate the jar

options = PipelineOptions(['--runner=SparkRunner',
                           '--environment_type=DOCKER',
                           '--environment_config=path-to-image:latest',
                           '--output_executable_path=output.jar'
                           ])

and am trying to run it with

spark-submit --master yarn --deploy-mode cluster --class
org.apache.beam.runners.spark.SparkPipelineRunner output.jar


but I get the following error


Exception in thread "main" java.lang.IllegalAccessError: class
org.apache.hadoop.hdfs.web.HftpFileSystem cannot access its superinterface
org.apache.hadoop.hdfs.web.TokenAspect$TokenManagementDelegator

at java.lang.ClassLoader.defineClass1(Native Method)

at java.lang.ClassLoader.defineClass(ClassLoader.java:757)

at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)

at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)

at java.net.URLClassLoader.access$100(URLClassLoader.java:74)

at java.net.URLClassLoader$1.run(URLClassLoader.java:369)

at java.net.URLClassLoader$1.run(URLClassLoader.java:363)

at java.security.AccessController.doPrivileged(Native Method)

at java.net.URLClassLoader.findClass(URLClassLoader.java:362)

at java.lang.ClassLoader.loadClass(ClassLoader.java:419)

at java.lang.ClassLoader.loadClass(ClassLoader.java:352)

at java.lang.Class.forName0(Native Method)

at java.lang.Class.forName(Class.java:348)

at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:370)

at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)

at java.util.ServiceLoader$1.next(ServiceLoader.java:480)

at org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:3268)

at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3313)

at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3352)

at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:123)

at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3403)

at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3371)

at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:482)

at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:230)

at
org.apache.spark.deploy.yarn.Client.$anonfun$appStagingBaseDir$2(Client.scala:138)

at scala.Option.getOrElse(Option.scala:189)

at org.apache.spark.deploy.yarn.Client.<init>(Client.scala:138)

at
org.apache.spark.deploy.yarn.YarnClusterApplication.start(Client.scala:1526)

at org.apache.spark.deploy.SparkSubmit.org
$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:853)

at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)

at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)

at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)

at
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:928)

at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:937)

at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Re: spark-submit and the portable runner

Posted by Kyle Weaver <kc...@google.com>.
Beam's default Hadoop version is 2.10.1 [0]. It looks like you can build
the Beam Spark "classic" (non-portable) runner with Hadoop 3.2.1 [1].
However it doesn't look like we publish separate jars for different Hadoop
versions. Nor does the portable runner (job server) make the Hadoop version
readily configurable. I filed a JIRA for this [2]. The fix may be as simple
as changing the Spark runner configuration to "hadoopVersion321" [3] but I
haven't gotten a chance to verify.

Then once you build a job server with the correct Hadoop version, you can
use it by setting the pipeline option --spark_job_server_jar.

[0]
[1]
https://github.com/apache/beam/blob/cae01f09c72ee8dce1df5fca683c89534720ea9a/runners/spark/spark_runner.gradle#L53
[2] https://issues.apache.org/jira/browse/BEAM-12549
[3]
https://github.com/apache/beam/blob/cae01f09c72ee8dce1df5fca683c89534720ea9a/runners/spark/job-server/spark_job_server.gradle#L55

On Thu, Jun 24, 2021 at 6:43 AM Trevor Kramer <tr...@gmail.com>
wrote:

> Thanks. That was the issue. The latest release of Beam indicates it
> supports Spark 3 and I find references in the code to Hadoop 3.2.1. Is it
> possible to configure beam to run on Hadoop 3.2.1?
>
> Trevor
>
> On Mon, Jun 21, 2021 at 6:19 PM Kyle Weaver <kc...@google.com> wrote:
>
>> Looks like a version mismatch between Hadoop dependencies [1]. Beam's
>> Python wrapper for Spark is currently pinned to Spark 2.4.8 [2]. Which
>> Spark and Hadoop versions are your cluster using?
>>
>> [1]
>> https://stackoverflow.com/questions/62880009/error-through-remote-spark-job-java-lang-illegalaccesserror-class-org-apache-h
>> [2] https://issues.apache.org/jira/browse/BEAM-12094
>>
>> On Mon, Jun 21, 2021 at 9:52 AM Trevor Kramer <tr...@gmail.com>
>> wrote:
>>
>>> Does anyone have an example of using spark-submit to run a beam job
>>> using the portable runner? The documentation indicates this is possible but
>>> doesn't give an example of how to do it.
>>>
>>> I am using the following pipeline options to generate the jar
>>>
>>> options = PipelineOptions(['--runner=SparkRunner',
>>>                            '--environment_type=DOCKER',
>>>                            '--environment_config=path-to-image:latest',
>>>                            '--output_executable_path=output.jar'
>>>                            ])
>>>
>>> and am trying to run it with
>>>
>>> spark-submit --master yarn --deploy-mode cluster --class
>>> org.apache.beam.runners.spark.SparkPipelineRunner output.jar
>>>
>>>
>>> but I get the following error
>>>
>>>
>>> Exception in thread "main" java.lang.IllegalAccessError: class
>>> org.apache.hadoop.hdfs.web.HftpFileSystem cannot access its superinterface
>>> org.apache.hadoop.hdfs.web.TokenAspect$TokenManagementDelegator
>>>
>>> at java.lang.ClassLoader.defineClass1(Native Method)
>>>
>>> at java.lang.ClassLoader.defineClass(ClassLoader.java:757)
>>>
>>> at
>>> java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
>>>
>>> at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
>>>
>>> at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
>>>
>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
>>>
>>> at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
>>>
>>> at java.security.AccessController.doPrivileged(Native Method)
>>>
>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
>>>
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
>>>
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
>>>
>>> at java.lang.Class.forName0(Native Method)
>>>
>>> at java.lang.Class.forName(Class.java:348)
>>>
>>> at
>>> java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:370)
>>>
>>> at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
>>>
>>> at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
>>>
>>> at org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:3268)
>>>
>>> at
>>> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3313)
>>>
>>> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3352)
>>>
>>> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:123)
>>>
>>> at
>>> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3403)
>>>
>>> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3371)
>>>
>>> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:482)
>>>
>>> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:230)
>>>
>>> at
>>> org.apache.spark.deploy.yarn.Client.$anonfun$appStagingBaseDir$2(Client.scala:138)
>>>
>>> at scala.Option.getOrElse(Option.scala:189)
>>>
>>> at org.apache.spark.deploy.yarn.Client.<init>(Client.scala:138)
>>>
>>> at
>>> org.apache.spark.deploy.yarn.YarnClusterApplication.start(Client.scala:1526)
>>>
>>> at org.apache.spark.deploy.SparkSubmit.org
>>> $apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:853)
>>>
>>> at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
>>>
>>> at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
>>>
>>> at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
>>>
>>> at
>>> org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:928)
>>>
>>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:937)
>>>
>>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>>
>>>

Re: spark-submit and the portable runner

Posted by Trevor Kramer <tr...@gmail.com>.
Thanks. That was the issue. The latest release of Beam indicates it
supports Spark 3 and I find references in the code to Hadoop 3.2.1. Is it
possible to configure beam to run on Hadoop 3.2.1?

Trevor

On Mon, Jun 21, 2021 at 6:19 PM Kyle Weaver <kc...@google.com> wrote:

> Looks like a version mismatch between Hadoop dependencies [1]. Beam's
> Python wrapper for Spark is currently pinned to Spark 2.4.8 [2]. Which
> Spark and Hadoop versions are your cluster using?
>
> [1]
> https://stackoverflow.com/questions/62880009/error-through-remote-spark-job-java-lang-illegalaccesserror-class-org-apache-h
> [2] https://issues.apache.org/jira/browse/BEAM-12094
>
> On Mon, Jun 21, 2021 at 9:52 AM Trevor Kramer <tr...@gmail.com>
> wrote:
>
>> Does anyone have an example of using spark-submit to run a beam job using
>> the portable runner? The documentation indicates this is possible but
>> doesn't give an example of how to do it.
>>
>> I am using the following pipeline options to generate the jar
>>
>> options = PipelineOptions(['--runner=SparkRunner',
>>                            '--environment_type=DOCKER',
>>                            '--environment_config=path-to-image:latest',
>>                            '--output_executable_path=output.jar'
>>                            ])
>>
>> and am trying to run it with
>>
>> spark-submit --master yarn --deploy-mode cluster --class
>> org.apache.beam.runners.spark.SparkPipelineRunner output.jar
>>
>>
>> but I get the following error
>>
>>
>> Exception in thread "main" java.lang.IllegalAccessError: class
>> org.apache.hadoop.hdfs.web.HftpFileSystem cannot access its superinterface
>> org.apache.hadoop.hdfs.web.TokenAspect$TokenManagementDelegator
>>
>> at java.lang.ClassLoader.defineClass1(Native Method)
>>
>> at java.lang.ClassLoader.defineClass(ClassLoader.java:757)
>>
>> at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
>>
>> at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
>>
>> at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
>>
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
>>
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
>>
>> at java.security.AccessController.doPrivileged(Native Method)
>>
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
>>
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
>>
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
>>
>> at java.lang.Class.forName0(Native Method)
>>
>> at java.lang.Class.forName(Class.java:348)
>>
>> at
>> java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:370)
>>
>> at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
>>
>> at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
>>
>> at org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:3268)
>>
>> at
>> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3313)
>>
>> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3352)
>>
>> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:123)
>>
>> at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3403)
>>
>> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3371)
>>
>> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:482)
>>
>> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:230)
>>
>> at
>> org.apache.spark.deploy.yarn.Client.$anonfun$appStagingBaseDir$2(Client.scala:138)
>>
>> at scala.Option.getOrElse(Option.scala:189)
>>
>> at org.apache.spark.deploy.yarn.Client.<init>(Client.scala:138)
>>
>> at
>> org.apache.spark.deploy.yarn.YarnClusterApplication.start(Client.scala:1526)
>>
>> at org.apache.spark.deploy.SparkSubmit.org
>> $apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:853)
>>
>> at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
>>
>> at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
>>
>> at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
>>
>> at
>> org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:928)
>>
>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:937)
>>
>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>
>>

Re: spark-submit and the portable runner

Posted by Kyle Weaver <kc...@google.com>.
Looks like a version mismatch between Hadoop dependencies [1]. Beam's
Python wrapper for Spark is currently pinned to Spark 2.4.8 [2]. Which
Spark and Hadoop versions are your cluster using?

[1]
https://stackoverflow.com/questions/62880009/error-through-remote-spark-job-java-lang-illegalaccesserror-class-org-apache-h
[2] https://issues.apache.org/jira/browse/BEAM-12094

On Mon, Jun 21, 2021 at 9:52 AM Trevor Kramer <tr...@gmail.com>
wrote:

> Does anyone have an example of using spark-submit to run a beam job using
> the portable runner? The documentation indicates this is possible but
> doesn't give an example of how to do it.
>
> I am using the following pipeline options to generate the jar
>
> options = PipelineOptions(['--runner=SparkRunner',
>                            '--environment_type=DOCKER',
>                            '--environment_config=path-to-image:latest',
>                            '--output_executable_path=output.jar'
>                            ])
>
> and am trying to run it with
>
> spark-submit --master yarn --deploy-mode cluster --class
> org.apache.beam.runners.spark.SparkPipelineRunner output.jar
>
>
> but I get the following error
>
>
> Exception in thread "main" java.lang.IllegalAccessError: class
> org.apache.hadoop.hdfs.web.HftpFileSystem cannot access its superinterface
> org.apache.hadoop.hdfs.web.TokenAspect$TokenManagementDelegator
>
> at java.lang.ClassLoader.defineClass1(Native Method)
>
> at java.lang.ClassLoader.defineClass(ClassLoader.java:757)
>
> at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
>
> at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
>
> at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
>
> at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
>
> at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
>
> at java.security.AccessController.doPrivileged(Native Method)
>
> at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
>
> at java.lang.Class.forName0(Native Method)
>
> at java.lang.Class.forName(Class.java:348)
>
> at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:370)
>
> at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
>
> at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
>
> at org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:3268)
>
> at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3313)
>
> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3352)
>
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:123)
>
> at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3403)
>
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3371)
>
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:482)
>
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:230)
>
> at
> org.apache.spark.deploy.yarn.Client.$anonfun$appStagingBaseDir$2(Client.scala:138)
>
> at scala.Option.getOrElse(Option.scala:189)
>
> at org.apache.spark.deploy.yarn.Client.<init>(Client.scala:138)
>
> at
> org.apache.spark.deploy.yarn.YarnClusterApplication.start(Client.scala:1526)
>
> at org.apache.spark.deploy.SparkSubmit.org
> $apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:853)
>
> at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
>
> at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
>
> at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
>
> at
> org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:928)
>
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:937)
>
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
>