You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Stelios Philippou <st...@gmail.com> on 2021/08/31 07:44:21 UTC

Spark Stream on Kubernetes Cannot Set up JavaSparkContext

Hello,

I have been facing the current issue for some time now and I was wondering
if someone might have some inside on how I can resolve the following.

The code (java 11) is working correctly on my local machine but whenever I
try to launch the following on K8 I am getting the following error.

21/08/31 07:28:42 ERROR  org.apache.spark.SparkContext: Error initializing
SparkContext.

java.util.ServiceConfigurationError:
org.apache.spark.scheduler.ExternalClusterManager:
org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
subtype



I have a spark that will monitor some directories and handle the data
accordingly.

That part is working correctly on K8 and the SparkContext has no issue
being initialized there.


This is the spark-submit for that


spark-submit \
--master=k8s://https://url:port \
--deploy-mode cluster \
--name a-name\
--conf spark.driver.userClassPathFirst=true  \
--conf spark.kubernetes.file.upload.path=hdfs://upload-path \
--files "application-dev.properties,keystore.jks,truststore.jks"  \
--conf spark.kubernetes.container.image=url/spark:spark-submit \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.kubernetes.namespace=spark \
--conf spark.kubernetes.container.image.pullPolicy=Always \
--conf spark.dynamicAllocation.enabled=false \
--driver-memory 525m --executor-memory 525m \
--num-executors 1 --executor-cores 1 \
target/SparkStream.jar continuous-merge


My issue comes when I try to launch the service in order to listen to kafka
events and store them in HDFS.


spark-submit \
--master=k8s://https://url:port \
--deploy-mode cluster \
--name consume-data \
--conf spark.driver.userClassPathFirst=true  \
--conf spark.kubernetes.file.upload.path=hdfs://upload-path\
--files "application-dev.properties,keystore.jks,truststore.jks"  \
--jars "spark-yarn_2.12-3.1.2.jar,spark-core_2.12-3.1.2.jar,kafka-clients-2.8.0.jar,spark-streaming-kafka-0-10_2.12-3.1.2.jar,spark-token-provider-kafka-0-10_2.12-3.1.2.jar"
\
--conf spark.kubernetes.container.image=url/spark:spark-submit \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.kubernetes.authenticate.executor.serviceAccountName=spark \
--conf spark.kubernetes.namespace=spark \
--conf spark.kubernetes.container.image.pullPolicy=Always \
--conf spark.dynamicAllocation.enabled=false \
--driver-memory 1g --executor-memory 1g \
--num-executors 1 --executor-cores 1 \
target/SparkStream.jar consume


It could be that I am launching the application wrongly or perhaps that my
K8 is not configured correctly ?



I have stripped down my code and left it barebone and will end up with the
following issue :


21/08/31 07:28:42 ERROR  org.apache.spark.SparkContext: Error initializing
SparkContext.

java.util.ServiceConfigurationError:
org.apache.spark.scheduler.ExternalClusterManager:
org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
subtype

at java.base/java.util.ServiceLoader.fail(Unknown Source)

at
java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(Unknown
Source)

at
java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(Unknown
Source)

at java.base/java.util.ServiceLoader$2.hasNext(Unknown Source)

at java.base/java.util.ServiceLoader$3.hasNext(Unknown Source)

at
scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:


21/08/31 07:28:42 WARN
org.springframework.context.annotation.AnnotationConfigApplicationContext:
Exception encountered during context initialization - cancelling refresh
attempt: org.springframework.beans.factory.UnsatisfiedDependencyException:
Error creating bean with name 'mainApplication': Unsatisfied dependency
expressed through field 'streamAllKafkaData'; nested exception is
org.springframework.beans.factory.UnsatisfiedDependencyException: Error
creating bean with name 'streamAllKafkaData': Unsatisfied dependency
expressed through field 'javaSparkContext'; nested exception is
org.springframework.beans.factory.BeanCreationException: Error creating
bean with name 'javaSparkContext' defined in class path resource
[com/configuration/SparkConfiguration.class]: Bean instantiation via
factory method failed; nested exception is
org.springframework.beans.BeanInstantiationException: Failed to instantiate
[org.apache.spark.api.java.JavaSparkContext]: Factory method
'javaSparkContext' threw exception; nested exception is
java.util.ServiceConfigurationError:
org.apache.spark.scheduler.ExternalClusterManager:
org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
subtype

21/08/31 07:28:42 ERROR  org.springframework.boot.SpringApplication:
Application run failed

org.springframework.beans.factory.UnsatisfiedDependencyException: Error
creating bean with name 'mainApplication': Unsatisfied dependency expressed
through field 'streamAllKafkaData'; nested exception is
org.springframework.beans.factory.UnsatisfiedDependencyException: Error
creating bean with name 'streamAllKafkaData': Unsatisfied dependency
expressed through field 'javaSparkContext'; nested exception is
org.springframework.beans.factory.BeanCreationException: Error creating
bean with name 'javaSparkContext' defined in class path resource
[com/configuration/SparkConfiguration.class]: Bean instantiation via
factory method failed; nested exception is
org.springframework.beans.BeanInstantiationException: Failed to instantiate
[org.apache.spark.api.java.JavaSparkContext]: Factory method
'javaSparkContext' threw exception; nested exception is
java.util.ServiceConfigurationError:
org.apache.spark.scheduler.ExternalClusterManager:
org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
subtype




It could be that i am launching the application for Kafka wrongly with all
the extra jars added ?

Just that those seem to be needed or i am getting other errors when not
including those.




Any help will be greatly appreciated.



Cheers,

Stelios

Re: Spark Stream on Kubernetes Cannot Set up JavaSparkContext

Posted by Mich Talebzadeh <mi...@gmail.com>.
hm,

I had similar issues. I built the docker image with JAVA 8 and that worked
in k8, Have you tried building your docker image with JAVA 8?

HTH



   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 31 Aug 2021 at 08:45, Stelios Philippou <st...@gmail.com> wrote:

> Hello,
>
> I have been facing the current issue for some time now and I was wondering
> if someone might have some inside on how I can resolve the following.
>
> The code (java 11) is working correctly on my local machine but whenever I
> try to launch the following on K8 I am getting the following error.
>
> 21/08/31 07:28:42 ERROR  org.apache.spark.SparkContext: Error
> initializing SparkContext.
>
> java.util.ServiceConfigurationError:
> org.apache.spark.scheduler.ExternalClusterManager:
> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
> subtype
>
>
>
> I have a spark that will monitor some directories and handle the data
> accordingly.
>
> That part is working correctly on K8 and the SparkContext has no issue
> being initialized there.
>
>
> This is the spark-submit for that
>
>
> spark-submit \
> --master=k8s://https://url:port \
> --deploy-mode cluster \
> --name a-name\
> --conf spark.driver.userClassPathFirst=true  \
> --conf spark.kubernetes.file.upload.path=hdfs://upload-path \
> --files "application-dev.properties,keystore.jks,truststore.jks"  \
> --conf spark.kubernetes.container.image=url/spark:spark-submit \
> --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
> --conf spark.kubernetes.namespace=spark \
> --conf spark.kubernetes.container.image.pullPolicy=Always \
> --conf spark.dynamicAllocation.enabled=false \
> --driver-memory 525m --executor-memory 525m \
> --num-executors 1 --executor-cores 1 \
> target/SparkStream.jar continuous-merge
>
>
> My issue comes when I try to launch the service in order to listen to
> kafka events and store them in HDFS.
>
>
> spark-submit \
> --master=k8s://https://url:port \
> --deploy-mode cluster \
> --name consume-data \
> --conf spark.driver.userClassPathFirst=true  \
> --conf spark.kubernetes.file.upload.path=hdfs://upload-path\
> --files "application-dev.properties,keystore.jks,truststore.jks"  \
> --jars "spark-yarn_2.12-3.1.2.jar,spark-core_2.12-3.1.2.jar,kafka-clients-2.8.0.jar,spark-streaming-kafka-0-10_2.12-3.1.2.jar,spark-token-provider-kafka-0-10_2.12-3.1.2.jar" \
> --conf spark.kubernetes.container.image=url/spark:spark-submit \
> --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
> --conf spark.kubernetes.authenticate.executor.serviceAccountName=spark \
> --conf spark.kubernetes.namespace=spark \
> --conf spark.kubernetes.container.image.pullPolicy=Always \
> --conf spark.dynamicAllocation.enabled=false \
> --driver-memory 1g --executor-memory 1g \
> --num-executors 1 --executor-cores 1 \
> target/SparkStream.jar consume
>
>
> It could be that I am launching the application wrongly or perhaps that my
> K8 is not configured correctly ?
>
>
>
> I have stripped down my code and left it barebone and will end up with the
> following issue :
>
>
> 21/08/31 07:28:42 ERROR  org.apache.spark.SparkContext: Error
> initializing SparkContext.
>
> java.util.ServiceConfigurationError:
> org.apache.spark.scheduler.ExternalClusterManager:
> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
> subtype
>
> at java.base/java.util.ServiceLoader.fail(Unknown Source)
>
> at
> java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(Unknown
> Source)
>
> at
> java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(Unknown
> Source)
>
> at java.base/java.util.ServiceLoader$2.hasNext(Unknown Source)
>
> at java.base/java.util.ServiceLoader$3.hasNext(Unknown Source)
>
> at
> scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:
>
>
> 21/08/31 07:28:42 WARN  org.springframework.context.annotation.AnnotationConfigApplicationContext:
> Exception encountered during context initialization - cancelling refresh
> attempt: org.springframework.beans.factory.UnsatisfiedDependencyException:
> Error creating bean with name 'mainApplication': Unsatisfied dependency
> expressed through field 'streamAllKafkaData'; nested exception is
> org.springframework.beans.factory.UnsatisfiedDependencyException: Error
> creating bean with name 'streamAllKafkaData': Unsatisfied dependency
> expressed through field 'javaSparkContext'; nested exception is
> org.springframework.beans.factory.BeanCreationException: Error creating
> bean with name 'javaSparkContext' defined in class path resource
> [com/configuration/SparkConfiguration.class]: Bean instantiation via
> factory method failed; nested exception is
> org.springframework.beans.BeanInstantiationException: Failed to instantiate
> [org.apache.spark.api.java.JavaSparkContext]: Factory method
> 'javaSparkContext' threw exception; nested exception is
> java.util.ServiceConfigurationError:
> org.apache.spark.scheduler.ExternalClusterManager:
> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
> subtype
>
> 21/08/31 07:28:42 ERROR  org.springframework.boot.SpringApplication:
> Application run failed
>
> org.springframework.beans.factory.UnsatisfiedDependencyException: Error
> creating bean with name 'mainApplication': Unsatisfied dependency expressed
> through field 'streamAllKafkaData'; nested exception is
> org.springframework.beans.factory.UnsatisfiedDependencyException: Error
> creating bean with name 'streamAllKafkaData': Unsatisfied dependency
> expressed through field 'javaSparkContext'; nested exception is
> org.springframework.beans.factory.BeanCreationException: Error creating
> bean with name 'javaSparkContext' defined in class path resource
> [com/configuration/SparkConfiguration.class]: Bean instantiation via
> factory method failed; nested exception is
> org.springframework.beans.BeanInstantiationException: Failed to instantiate
> [org.apache.spark.api.java.JavaSparkContext]: Factory method
> 'javaSparkContext' threw exception; nested exception is
> java.util.ServiceConfigurationError:
> org.apache.spark.scheduler.ExternalClusterManager:
> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
> subtype
>
>
>
>
> It could be that i am launching the application for Kafka wrongly with all
> the extra jars added ?
>
> Just that those seem to be needed or i am getting other errors when not
> including those.
>
>
>
>
> Any help will be greatly appreciated.
>
>
>
> Cheers,
>
> Stelios
>
>
>
>

Re: Spark Stream on Kubernetes Cannot Set up JavaSparkContext

Posted by Stelios Philippou <st...@gmail.com>.
Stelios Philippou
16:20 (3 minutes ago)
to Mich
My local Spark submit :
 ~/development/SimpleKafkaStream  spark-submit --version



Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.1.2
      /_/

Using Scala version 2.12.10, OpenJDK 64-Bit Server VM, 11.0.10
Branch HEAD
Compiled by user centos on 2021-05-24T04:27:48Z


On K8  i have one for j8 and one for j11
The K8 Docker env :

/opt/spark/bin/spark-submit --version
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.1.1
/_/

Using Scala version 2.12.10, OpenJDK 64-Bit Server VM, 1.8.0_302

The k8 J11 Env :
:/opt/spark/work-dir$ /opt/spark/bin/spark-submit --version
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.1.1
/_/

Using Scala version 2.12.10, OpenJDK 64-Bit Server VM, 11.0.11


Will downgrade now to check for 3.1.1 as you mentioned. But as this is a
minor version i dont believe that there should be any issues there.

On Mon, 6 Sept 2021 at 16:12, Mich Talebzadeh <mi...@gmail.com>
wrote:

>
>    1. which version of Spark the docker is built for
>    2. Which version of spark-submit you are using to submit this job
>
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 6 Sept 2021 at 14:07, Stelios Philippou <st...@gmail.com>
> wrote:
>
>> Yes on Local mode both from intelli and using spark-submit on my machine
>> and on a windows machine work.
>>
>> I have noticed the following error when adding this in the above
>> spark-submit for k8
>>
>> --packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1.2 \
>>
>>
>> :: resolving dependencies ::
>> org.apache.spark#spark-submit-parent-683eee8e-9409-49ea-b0a9-7cf871af7f0c;1.0
>>
>> confs: [default]
>>
>> Exception in thread "main" java.io.FileNotFoundException:
>> /opt/spark/.ivy2/cache/resolved-org.apache.spark-spark-submit-parent-683eee8e-9409-49ea-b0a9-7cf871af7f0c-1.0.xml
>> (No such file or directory)
>>
>>
>>
>> is there some way to verify that the k8 installation is correct ?
>>
>> Other spark processes that do not have streaming involved do work
>> correctly.
>>
>> On Mon, 6 Sept 2021 at 16:03, Mich Talebzadeh <mi...@gmail.com>
>> wrote:
>>
>>>
>>> Hi,
>>>
>>>
>>> Have you tried this on local mode as opposed to Kubernetes to see if it
>>> works?
>>>
>>>
>>> HTH
>>>
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Mon, 6 Sept 2021 at 11:16, Stelios Philippou <st...@gmail.com>
>>> wrote:
>>>
>>>> Hello Jacek,
>>>>
>>>> Yes this is a spark-streaming.
>>>>  I have removed all code and created a new project with just the base
>>>> code that is enough to open a stream and loop over it to see what i am
>>>> doing wrong.
>>>>
>>>> Not adding the packages would result me in the following error
>>>>
>>>> 21/09/06 08:10:41 WARN  org.apache.spark.scheduler.TaskSetManager: Lost
>>>> task 0.0 in stage 0.0 (TID 0) (10.60.60.128 executor 1):
>>>> java.lang.ClassNotFoundException:
>>>> org.apache.spark.streaming.kafka010.KafkaRDDPartition
>>>>
>>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>>>>
>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
>>>>
>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
>>>>
>>>> at java.lang.Class.forName0(Native Method)
>>>>
>>>> at java.lang.Class.forName(Class.java:348)
>>>>
>>>> at
>>>> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)
>>>>
>>>>
>>>> Which should not really be the case cause this should be included in
>>>> the kubernetes pod. Anyway I can confirm this ?
>>>>
>>>>
>>>> So my simple class is as follow :
>>>>
>>>>
>>>> streamingContext = new JavaStreamingContext(javaSparkContext, Durations.seconds(5));
>>>>
>>>> stream = KafkaUtils.createDirectStream(streamingContext, LocationStrategies.PreferConsistent(),
>>>>    ConsumerStrategies.Subscribe(topics, kafkaConfiguration));
>>>>
>>>> stream.foreachRDD((VoidFunction<JavaRDD<ConsumerRecord<String, byte[]>>>) rdd -> {
>>>>    try {
>>>>       rdd.foreachPartition(partition -> {
>>>>          while (partition.hasNext()) {
>>>>             ConsumerRecord<String, byte[]> consumerRecord = partition.next();
>>>>             LOGGER.info("WORKING " + consumerRecord.topic() +consumerRecord.partition() + ": "+consumerRecord.offset());
>>>>          }
>>>>       });
>>>>    } catch (Exception e) {
>>>>       e.printStackTrace();
>>>>    }
>>>> });
>>>>
>>>> streamingContext.start();
>>>> try {
>>>>    streamingContext.awaitTermination();
>>>> } catch (InterruptedException e) {
>>>>    e.printStackTrace();
>>>> } finally {
>>>>    streamingContext.stop();
>>>>    javaSparkContext.stop();
>>>> }
>>>>
>>>>
>>>> This is all there is too the class which is a java boot @Component.
>>>>
>>>> Now in order my pom is as such
>>>>
>>>> <?xml version="1.0" encoding="UTF-8"?>
>>>> <project xmlns="http://maven.apache.org/POM/4.0.0"
>>>>   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>>>>   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
>>>>   <modelVersion>4.0.0</modelVersion>
>>>>
>>>>   <groupId>com.kafka</groupId>
>>>>   <artifactId>SimpleKafkaStream</artifactId>
>>>>   <version>1.0</version>
>>>>
>>>>   <packaging>jar</packaging>
>>>>
>>>>   <properties>
>>>>     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
>>>>     <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
>>>>     <maven.compiler.source>8</maven.compiler.source>
>>>>     <maven.compiler.target>8</maven.compiler.target>
>>>>     <start-class>com.kafka.Main</start-class>
>>>>   </properties>
>>>>
>>>>   <parent>
>>>>     <groupId>org.springframework.boot</groupId>
>>>>     <artifactId>spring-boot-starter-parent</artifactId>
>>>>     <version>2.4.2</version>
>>>>     <relativePath/>
>>>>   </parent>
>>>>
>>>>   <dependencies>
>>>>     <dependency>
>>>>       <groupId>org.springframework.boot</groupId>
>>>>       <artifactId>spring-boot-starter</artifactId>
>>>>       <exclusions>
>>>>         <exclusion>
>>>>           <groupId>org.springframework.boot</groupId>
>>>>           <artifactId>spring-boot-starter-logging</artifactId>
>>>>         </exclusion>
>>>>       </exclusions>
>>>>     </dependency>
>>>>
>>>>     <dependency>
>>>>       <groupId>org.apache.spark</groupId>
>>>>       <artifactId>spark-core_2.12</artifactId>
>>>>       <version>3.1.2</version>
>>>>     </dependency>
>>>>
>>>>     <dependency>
>>>>       <groupId>org.apache.spark</groupId>
>>>>       <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
>>>>       <version>3.1.2</version>
>>>>       <scope>provided</scope>
>>>>     </dependency>
>>>>
>>>>     <dependency>
>>>>       <groupId>org.apache.spark</groupId>
>>>>       <artifactId>spark-streaming_2.12</artifactId>
>>>>       <version>3.1.2</version>
>>>>     </dependency>
>>>>
>>>>   </dependencies>
>>>>
>>>>   <build>
>>>>     <plugins>
>>>>       <plugin>
>>>>         <groupId>org.springframework.boot</groupId>
>>>>         <artifactId>spring-boot-maven-plugin</artifactId>
>>>>       </plugin>
>>>>
>>>>       <plugin>
>>>>         <groupId>org.apache.maven.plugins</groupId>
>>>>         <artifactId>maven-compiler-plugin</artifactId>
>>>>         <version>3.8.1</version>
>>>>         <configuration>
>>>>           <source>1.8</source>
>>>>           <target>1.8</target>
>>>>         </configuration>
>>>>       </plugin>
>>>>
>>>>     </plugins>
>>>>   </build>
>>>>
>>>> </project>
>>>>
>>>> a simple pom that even the  spark-streaming-kafka-0-10_2.12 scope is
>>>> provided or not it would stilly give the same error.
>>>>
>>>> I have tried to build an uber jar in order to test with that but i was
>>>> still unable to make it work as such :
>>>>
>>>> <build>
>>>>   <plugins>
>>>>     <plugin>
>>>>       <groupId>org.springframework.boot</groupId>
>>>>       <artifactId>spring-boot-maven-plugin</artifactId>
>>>>       <configuration>
>>>>         <fork>true</fork>
>>>>         <mainClass>com.kafka.Main</mainClass>
>>>>       </configuration>
>>>>       <executions>
>>>>         <execution>
>>>>           <goals>
>>>>             <goal>repackage</goal>
>>>>           </goals>
>>>>         </execution>
>>>>       </executions>
>>>>     </plugin>
>>>>     <plugin>
>>>>       <artifactId>maven-assembly-plugin</artifactId>
>>>>       <version>3.2.0</version>
>>>>       <configuration>
>>>>         <descriptorRefs>
>>>>           <descriptorRef>dependencies</descriptorRef>
>>>>         </descriptorRefs>
>>>>         <archive>
>>>>           <manifest>
>>>>             <addClasspath>true</addClasspath>
>>>>             <mainClass>com.kafka.Main</mainClass>
>>>>           </manifest>
>>>>         </archive>
>>>>       </configuration>
>>>>       <executions>
>>>>         <execution>
>>>>           <id>make-assembly</id>
>>>>           <phase>package</phase>
>>>>           <goals>
>>>>             <goal>single</goal>
>>>>           </goals>
>>>>         </execution>
>>>>       </executions>
>>>>     </plugin>
>>>>
>>>>     <plugin>
>>>>       <groupId>org.apache.maven.plugins</groupId>
>>>>       <artifactId>maven-compiler-plugin</artifactId>
>>>>       <version>3.8.1</version>
>>>>       <configuration>
>>>>         <source>1.8</source>
>>>>         <target>1.8</target>
>>>>       </configuration>
>>>>     </plugin>
>>>>
>>>>   </plugins>
>>>>
>>>> </build>
>>>>
>>>>  I am open to any suggestions and implementations in why this is not
>>>> working and what needs to be done.
>>>>
>>>>
>>>> Thank you for your time,
>>>>
>>>> Stelios
>>>>
>>>> On Sun, 5 Sept 2021 at 16:56, Jacek Laskowski <ja...@japila.pl> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> No idea still, but noticed
>>>>> "org.apache.spark.streaming.kafka010.KafkaRDDPartition" and "--jars
>>>>> "spark-yarn_2.12-3.1.2.jar,spark-core_2.12-3.1.2.jar,kafka-clients-2.8.0.jar,spark-streaming-kafka-0-10_2.12-3.1.2.jar,spark-token-provider-kafka-0-10_2.12-3.1.2.jar"
>>>>> \" that bothers me quite a lot.
>>>>>
>>>>> First of all, it's a Spark Streaming (not Structured Streaming) app.
>>>>> Correct? Please upgrade at your earliest convenience since it's no longer
>>>>> in active development (if supported at all).
>>>>>
>>>>> Secondly, why are these jars listed explicitly since they're part of
>>>>> Spark? You should not really be doing such risky config changes (unless
>>>>> you've got no other choice and you know what you're doing).
>>>>>
>>>>> Pozdrawiam,
>>>>> Jacek Laskowski
>>>>> ----
>>>>> https://about.me/JacekLaskowski
>>>>> "The Internals Of" Online Books <https://books.japila.pl/>
>>>>> Follow me on https://twitter.com/jaceklaskowski
>>>>>
>>>>> <https://twitter.com/jaceklaskowski>
>>>>>
>>>>>
>>>>> On Tue, Aug 31, 2021 at 1:00 PM Stelios Philippou <st...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Yes you are right.
>>>>>> I am using Spring Boot for this.
>>>>>>
>>>>>> The same does work for the event that does not involve any kafka
>>>>>> events. But again i am not sending out extra jars there so nothing is
>>>>>> replaced and we are using the default ones.
>>>>>>
>>>>>> If i do not use the userClassPathFirst which will force the service
>>>>>> to use the newer version i will end up with the same problem
>>>>>>
>>>>>> We are using protobuf v3+ and as such we need to push that version
>>>>>> since apache core uses an older version.
>>>>>>
>>>>>> So all we should really need is the following : --jars
>>>>>> "protobuf-java-3.17.3.jar" \
>>>>>> and here we need the userClassPathFirst=true in order to use the
>>>>>> latest version.
>>>>>>
>>>>>>
>>>>>> Using only this jar as it works on local or no jars defined we
>>>>>> ended up with the following error.
>>>>>>
>>>>>> 21/08/31 10:53:40 WARN  org.apache.spark.scheduler.TaskSetManager:
>>>>>> Lost task 0.0 in stage 18.0 (TID 139) (10.60.63.56 executor 1):
>>>>>> java.lang.ClassNotFoundException:
>>>>>> org.apache.spark.streaming.kafka010.KafkaRDDPartition
>>>>>>
>>>>>> at java.base/java.net.URLClassLoader.findClass(Unknown Source)
>>>>>>
>>>>>> at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
>>>>>>
>>>>>> at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
>>>>>>
>>>>>> at java.base/java.lang.Class.forName0(Native Method)
>>>>>>
>>>>>> at java.base/java.lang.Class.forName(Unknown Source)
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> Which can be resolved with passing more jars.
>>>>>>
>>>>>>
>>>>>> Any idea about this error ?
>>>>>>
>>>>>> K8 does not seem to like this, but Java Spring should be the one that
>>>>>> is responsible for the version but it seems K8 does not like this versions.
>>>>>>
>>>>>> Perhaps miss configuration on K8 ?
>>>>>>
>>>>>> I haven't set that up so i am not aware of what was done there.
>>>>>>
>>>>>>
>>>>>>
>>>>>> For downgrading to java 8 on my K8 might not be so easy. I want to
>>>>>> explore if there is something else before doing that as we will need to
>>>>>> spin off new instances of K8 to check that.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Thank you for the time taken
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, 31 Aug 2021 at 12:26, Jacek Laskowski <ja...@japila.pl>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Stelios,
>>>>>>>
>>>>>>> I've never seen this error before, but a couple of things caught
>>>>>>> my attention that I would look at closer to chase the root cause of the
>>>>>>> issue.
>>>>>>>
>>>>>>> "org.springframework.context.annotation.AnnotationConfigApplicationContext:"
>>>>>>> and "21/08/31 07:28:42 ERROR  org.springframework.boot.SpringApplication:
>>>>>>> Application run failed" seem to indicate that you're using Spring Boot
>>>>>>> (that I know almost nothing about so take the following with a pinch of
>>>>>>> salt :))
>>>>>>>
>>>>>>> Spring Boot manages the classpath by itself and together with
>>>>>>> another interesting option in your
>>>>>>> spark-submit, spark.driver.userClassPathFirst=true, makes me wonder how
>>>>>>> much this exception:
>>>>>>>
>>>>>>> > org.apache.spark.scheduler.ExternalClusterManager:
>>>>>>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
>>>>>>> subtype
>>>>>>>
>>>>>>> could be due to casting compatible types from two different
>>>>>>> classloaders?
>>>>>>>
>>>>>>> Just a thought but wanted to share as I think it's worth
>>>>>>> investigating.
>>>>>>>
>>>>>>> Pozdrawiam,
>>>>>>> Jacek Laskowski
>>>>>>> ----
>>>>>>> https://about.me/JacekLaskowski
>>>>>>> "The Internals Of" Online Books <https://books.japila.pl/>
>>>>>>> Follow me on https://twitter.com/jaceklaskowski
>>>>>>>
>>>>>>> <https://twitter.com/jaceklaskowski>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Aug 31, 2021 at 9:44 AM Stelios Philippou <
>>>>>>> stevoo82@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hello,
>>>>>>>>
>>>>>>>> I have been facing the current issue for some time now and I was
>>>>>>>> wondering if someone might have some inside on how I can resolve the
>>>>>>>> following.
>>>>>>>>
>>>>>>>> The code (java 11) is working correctly on my local machine but
>>>>>>>> whenever I try to launch the following on K8 I am getting the following
>>>>>>>> error.
>>>>>>>>
>>>>>>>> 21/08/31 07:28:42 ERROR  org.apache.spark.SparkContext: Error
>>>>>>>> initializing SparkContext.
>>>>>>>>
>>>>>>>> java.util.ServiceConfigurationError:
>>>>>>>> org.apache.spark.scheduler.ExternalClusterManager:
>>>>>>>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
>>>>>>>> subtype
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> I have a spark that will monitor some directories and handle the
>>>>>>>> data accordingly.
>>>>>>>>
>>>>>>>> That part is working correctly on K8 and the SparkContext has no
>>>>>>>> issue being initialized there.
>>>>>>>>
>>>>>>>>
>>>>>>>> This is the spark-submit for that
>>>>>>>>
>>>>>>>>
>>>>>>>> spark-submit \
>>>>>>>> --master=k8s://https://url:port \
>>>>>>>> --deploy-mode cluster \
>>>>>>>> --name a-name\
>>>>>>>> --conf spark.driver.userClassPathFirst=true  \
>>>>>>>> --conf spark.kubernetes.file.upload.path=hdfs://upload-path \
>>>>>>>> --files "application-dev.properties,keystore.jks,truststore.jks"  \
>>>>>>>> --conf spark.kubernetes.container.image=url/spark:spark-submit \
>>>>>>>> --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
>>>>>>>> --conf spark.kubernetes.namespace=spark \
>>>>>>>> --conf spark.kubernetes.container.image.pullPolicy=Always \
>>>>>>>> --conf spark.dynamicAllocation.enabled=false \
>>>>>>>> --driver-memory 525m --executor-memory 525m \
>>>>>>>> --num-executors 1 --executor-cores 1 \
>>>>>>>> target/SparkStream.jar continuous-merge
>>>>>>>>
>>>>>>>>
>>>>>>>> My issue comes when I try to launch the service in order to listen
>>>>>>>> to kafka events and store them in HDFS.
>>>>>>>>
>>>>>>>>
>>>>>>>> spark-submit \
>>>>>>>> --master=k8s://https://url:port \
>>>>>>>> --deploy-mode cluster \
>>>>>>>> --name consume-data \
>>>>>>>> --conf spark.driver.userClassPathFirst=true  \
>>>>>>>> --conf spark.kubernetes.file.upload.path=hdfs://upload-path\
>>>>>>>> --files "application-dev.properties,keystore.jks,truststore.jks"  \
>>>>>>>> --jars "spark-yarn_2.12-3.1.2.jar,spark-core_2.12-3.1.2.jar,kafka-clients-2.8.0.jar,spark-streaming-kafka-0-10_2.12-3.1.2.jar,spark-token-provider-kafka-0-10_2.12-3.1.2.jar" \
>>>>>>>> --conf spark.kubernetes.container.image=url/spark:spark-submit \
>>>>>>>> --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
>>>>>>>> --conf spark.kubernetes.authenticate.executor.serviceAccountName=spark \
>>>>>>>> --conf spark.kubernetes.namespace=spark \
>>>>>>>> --conf spark.kubernetes.container.image.pullPolicy=Always \
>>>>>>>> --conf spark.dynamicAllocation.enabled=false \
>>>>>>>> --driver-memory 1g --executor-memory 1g \
>>>>>>>> --num-executors 1 --executor-cores 1 \
>>>>>>>> target/SparkStream.jar consume
>>>>>>>>
>>>>>>>>
>>>>>>>> It could be that I am launching the application wrongly or perhaps
>>>>>>>> that my K8 is not configured correctly ?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> I have stripped down my code and left it barebone and will end up
>>>>>>>> with the following issue :
>>>>>>>>
>>>>>>>>
>>>>>>>> 21/08/31 07:28:42 ERROR  org.apache.spark.SparkContext: Error
>>>>>>>> initializing SparkContext.
>>>>>>>>
>>>>>>>> java.util.ServiceConfigurationError:
>>>>>>>> org.apache.spark.scheduler.ExternalClusterManager:
>>>>>>>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
>>>>>>>> subtype
>>>>>>>>
>>>>>>>> at java.base/java.util.ServiceLoader.fail(Unknown Source)
>>>>>>>>
>>>>>>>> at
>>>>>>>> java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(Unknown
>>>>>>>> Source)
>>>>>>>>
>>>>>>>> at
>>>>>>>> java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(Unknown
>>>>>>>> Source)
>>>>>>>>
>>>>>>>> at java.base/java.util.ServiceLoader$2.hasNext(Unknown Source)
>>>>>>>>
>>>>>>>> at java.base/java.util.ServiceLoader$3.hasNext(Unknown Source)
>>>>>>>>
>>>>>>>> at
>>>>>>>> scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:
>>>>>>>>
>>>>>>>>
>>>>>>>> 21/08/31 07:28:42 WARN  org.springframework.context.annotation.AnnotationConfigApplicationContext:
>>>>>>>> Exception encountered during context initialization - cancelling refresh
>>>>>>>> attempt: org.springframework.beans.factory.UnsatisfiedDependencyException:
>>>>>>>> Error creating bean with name 'mainApplication': Unsatisfied dependency
>>>>>>>> expressed through field 'streamAllKafkaData'; nested exception is
>>>>>>>> org.springframework.beans.factory.UnsatisfiedDependencyException: Error
>>>>>>>> creating bean with name 'streamAllKafkaData': Unsatisfied dependency
>>>>>>>> expressed through field 'javaSparkContext'; nested exception is
>>>>>>>> org.springframework.beans.factory.BeanCreationException: Error creating
>>>>>>>> bean with name 'javaSparkContext' defined in class path resource
>>>>>>>> [com/configuration/SparkConfiguration.class]: Bean instantiation via
>>>>>>>> factory method failed; nested exception is
>>>>>>>> org.springframework.beans.BeanInstantiationException: Failed to instantiate
>>>>>>>> [org.apache.spark.api.java.JavaSparkContext]: Factory method
>>>>>>>> 'javaSparkContext' threw exception; nested exception is
>>>>>>>> java.util.ServiceConfigurationError:
>>>>>>>> org.apache.spark.scheduler.ExternalClusterManager:
>>>>>>>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
>>>>>>>> subtype
>>>>>>>>
>>>>>>>> 21/08/31 07:28:42 ERROR  org.springframework.boot.SpringApplication:
>>>>>>>> Application run failed
>>>>>>>>
>>>>>>>> org.springframework.beans.factory.UnsatisfiedDependencyException:
>>>>>>>> Error creating bean with name 'mainApplication': Unsatisfied dependency
>>>>>>>> expressed through field 'streamAllKafkaData'; nested exception is
>>>>>>>> org.springframework.beans.factory.UnsatisfiedDependencyException: Error
>>>>>>>> creating bean with name 'streamAllKafkaData': Unsatisfied dependency
>>>>>>>> expressed through field 'javaSparkContext'; nested exception is
>>>>>>>> org.springframework.beans.factory.BeanCreationException: Error creating
>>>>>>>> bean with name 'javaSparkContext' defined in class path resource
>>>>>>>> [com/configuration/SparkConfiguration.class]: Bean instantiation via
>>>>>>>> factory method failed; nested exception is
>>>>>>>> org.springframework.beans.BeanInstantiationException: Failed to instantiate
>>>>>>>> [org.apache.spark.api.java.JavaSparkContext]: Factory method
>>>>>>>> 'javaSparkContext' threw exception; nested exception is
>>>>>>>> java.util.ServiceConfigurationError:
>>>>>>>> org.apache.spark.scheduler.ExternalClusterManager:
>>>>>>>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
>>>>>>>> subtype
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> It could be that i am launching the application for Kafka wrongly
>>>>>>>> with all the extra jars added ?
>>>>>>>>
>>>>>>>> Just that those seem to be needed or i am getting other errors when
>>>>>>>> not including those.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Any help will be greatly appreciated.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>>
>>>>>>>> Stelios
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>

Re: Spark Stream on Kubernetes Cannot Set up JavaSparkContext

Posted by Mich Talebzadeh <mi...@gmail.com>.
   1. which version of Spark the docker is built for
   2. Which version of spark-submit you are using to submit this job



   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 6 Sept 2021 at 14:07, Stelios Philippou <st...@gmail.com> wrote:

> Yes on Local mode both from intelli and using spark-submit on my machine
> and on a windows machine work.
>
> I have noticed the following error when adding this in the above
> spark-submit for k8
>
> --packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1.2 \
>
>
> :: resolving dependencies ::
> org.apache.spark#spark-submit-parent-683eee8e-9409-49ea-b0a9-7cf871af7f0c;1.0
>
> confs: [default]
>
> Exception in thread "main" java.io.FileNotFoundException:
> /opt/spark/.ivy2/cache/resolved-org.apache.spark-spark-submit-parent-683eee8e-9409-49ea-b0a9-7cf871af7f0c-1.0.xml
> (No such file or directory)
>
>
>
> is there some way to verify that the k8 installation is correct ?
>
> Other spark processes that do not have streaming involved do work
> correctly.
>
> On Mon, 6 Sept 2021 at 16:03, Mich Talebzadeh <mi...@gmail.com>
> wrote:
>
>>
>> Hi,
>>
>>
>> Have you tried this on local mode as opposed to Kubernetes to see if it
>> works?
>>
>>
>> HTH
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Mon, 6 Sept 2021 at 11:16, Stelios Philippou <st...@gmail.com>
>> wrote:
>>
>>> Hello Jacek,
>>>
>>> Yes this is a spark-streaming.
>>>  I have removed all code and created a new project with just the base
>>> code that is enough to open a stream and loop over it to see what i am
>>> doing wrong.
>>>
>>> Not adding the packages would result me in the following error
>>>
>>> 21/09/06 08:10:41 WARN  org.apache.spark.scheduler.TaskSetManager: Lost
>>> task 0.0 in stage 0.0 (TID 0) (10.60.60.128 executor 1):
>>> java.lang.ClassNotFoundException:
>>> org.apache.spark.streaming.kafka010.KafkaRDDPartition
>>>
>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>>>
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
>>>
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
>>>
>>> at java.lang.Class.forName0(Native Method)
>>>
>>> at java.lang.Class.forName(Class.java:348)
>>>
>>> at
>>> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)
>>>
>>>
>>> Which should not really be the case cause this should be included in the
>>> kubernetes pod. Anyway I can confirm this ?
>>>
>>>
>>> So my simple class is as follow :
>>>
>>>
>>> streamingContext = new JavaStreamingContext(javaSparkContext, Durations.seconds(5));
>>>
>>> stream = KafkaUtils.createDirectStream(streamingContext, LocationStrategies.PreferConsistent(),
>>>    ConsumerStrategies.Subscribe(topics, kafkaConfiguration));
>>>
>>> stream.foreachRDD((VoidFunction<JavaRDD<ConsumerRecord<String, byte[]>>>) rdd -> {
>>>    try {
>>>       rdd.foreachPartition(partition -> {
>>>          while (partition.hasNext()) {
>>>             ConsumerRecord<String, byte[]> consumerRecord = partition.next();
>>>             LOGGER.info("WORKING " + consumerRecord.topic() +consumerRecord.partition() + ": "+consumerRecord.offset());
>>>          }
>>>       });
>>>    } catch (Exception e) {
>>>       e.printStackTrace();
>>>    }
>>> });
>>>
>>> streamingContext.start();
>>> try {
>>>    streamingContext.awaitTermination();
>>> } catch (InterruptedException e) {
>>>    e.printStackTrace();
>>> } finally {
>>>    streamingContext.stop();
>>>    javaSparkContext.stop();
>>> }
>>>
>>>
>>> This is all there is too the class which is a java boot @Component.
>>>
>>> Now in order my pom is as such
>>>
>>> <?xml version="1.0" encoding="UTF-8"?>
>>> <project xmlns="http://maven.apache.org/POM/4.0.0"
>>>   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>>>   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
>>>   <modelVersion>4.0.0</modelVersion>
>>>
>>>   <groupId>com.kafka</groupId>
>>>   <artifactId>SimpleKafkaStream</artifactId>
>>>   <version>1.0</version>
>>>
>>>   <packaging>jar</packaging>
>>>
>>>   <properties>
>>>     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
>>>     <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
>>>     <maven.compiler.source>8</maven.compiler.source>
>>>     <maven.compiler.target>8</maven.compiler.target>
>>>     <start-class>com.kafka.Main</start-class>
>>>   </properties>
>>>
>>>   <parent>
>>>     <groupId>org.springframework.boot</groupId>
>>>     <artifactId>spring-boot-starter-parent</artifactId>
>>>     <version>2.4.2</version>
>>>     <relativePath/>
>>>   </parent>
>>>
>>>   <dependencies>
>>>     <dependency>
>>>       <groupId>org.springframework.boot</groupId>
>>>       <artifactId>spring-boot-starter</artifactId>
>>>       <exclusions>
>>>         <exclusion>
>>>           <groupId>org.springframework.boot</groupId>
>>>           <artifactId>spring-boot-starter-logging</artifactId>
>>>         </exclusion>
>>>       </exclusions>
>>>     </dependency>
>>>
>>>     <dependency>
>>>       <groupId>org.apache.spark</groupId>
>>>       <artifactId>spark-core_2.12</artifactId>
>>>       <version>3.1.2</version>
>>>     </dependency>
>>>
>>>     <dependency>
>>>       <groupId>org.apache.spark</groupId>
>>>       <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
>>>       <version>3.1.2</version>
>>>       <scope>provided</scope>
>>>     </dependency>
>>>
>>>     <dependency>
>>>       <groupId>org.apache.spark</groupId>
>>>       <artifactId>spark-streaming_2.12</artifactId>
>>>       <version>3.1.2</version>
>>>     </dependency>
>>>
>>>   </dependencies>
>>>
>>>   <build>
>>>     <plugins>
>>>       <plugin>
>>>         <groupId>org.springframework.boot</groupId>
>>>         <artifactId>spring-boot-maven-plugin</artifactId>
>>>       </plugin>
>>>
>>>       <plugin>
>>>         <groupId>org.apache.maven.plugins</groupId>
>>>         <artifactId>maven-compiler-plugin</artifactId>
>>>         <version>3.8.1</version>
>>>         <configuration>
>>>           <source>1.8</source>
>>>           <target>1.8</target>
>>>         </configuration>
>>>       </plugin>
>>>
>>>     </plugins>
>>>   </build>
>>>
>>> </project>
>>>
>>> a simple pom that even the  spark-streaming-kafka-0-10_2.12 scope is
>>> provided or not it would stilly give the same error.
>>>
>>> I have tried to build an uber jar in order to test with that but i was
>>> still unable to make it work as such :
>>>
>>> <build>
>>>   <plugins>
>>>     <plugin>
>>>       <groupId>org.springframework.boot</groupId>
>>>       <artifactId>spring-boot-maven-plugin</artifactId>
>>>       <configuration>
>>>         <fork>true</fork>
>>>         <mainClass>com.kafka.Main</mainClass>
>>>       </configuration>
>>>       <executions>
>>>         <execution>
>>>           <goals>
>>>             <goal>repackage</goal>
>>>           </goals>
>>>         </execution>
>>>       </executions>
>>>     </plugin>
>>>     <plugin>
>>>       <artifactId>maven-assembly-plugin</artifactId>
>>>       <version>3.2.0</version>
>>>       <configuration>
>>>         <descriptorRefs>
>>>           <descriptorRef>dependencies</descriptorRef>
>>>         </descriptorRefs>
>>>         <archive>
>>>           <manifest>
>>>             <addClasspath>true</addClasspath>
>>>             <mainClass>com.kafka.Main</mainClass>
>>>           </manifest>
>>>         </archive>
>>>       </configuration>
>>>       <executions>
>>>         <execution>
>>>           <id>make-assembly</id>
>>>           <phase>package</phase>
>>>           <goals>
>>>             <goal>single</goal>
>>>           </goals>
>>>         </execution>
>>>       </executions>
>>>     </plugin>
>>>
>>>     <plugin>
>>>       <groupId>org.apache.maven.plugins</groupId>
>>>       <artifactId>maven-compiler-plugin</artifactId>
>>>       <version>3.8.1</version>
>>>       <configuration>
>>>         <source>1.8</source>
>>>         <target>1.8</target>
>>>       </configuration>
>>>     </plugin>
>>>
>>>   </plugins>
>>>
>>> </build>
>>>
>>>  I am open to any suggestions and implementations in why this is not
>>> working and what needs to be done.
>>>
>>>
>>> Thank you for your time,
>>>
>>> Stelios
>>>
>>> On Sun, 5 Sept 2021 at 16:56, Jacek Laskowski <ja...@japila.pl> wrote:
>>>
>>>> Hi,
>>>>
>>>> No idea still, but noticed
>>>> "org.apache.spark.streaming.kafka010.KafkaRDDPartition" and "--jars
>>>> "spark-yarn_2.12-3.1.2.jar,spark-core_2.12-3.1.2.jar,kafka-clients-2.8.0.jar,spark-streaming-kafka-0-10_2.12-3.1.2.jar,spark-token-provider-kafka-0-10_2.12-3.1.2.jar"
>>>> \" that bothers me quite a lot.
>>>>
>>>> First of all, it's a Spark Streaming (not Structured Streaming) app.
>>>> Correct? Please upgrade at your earliest convenience since it's no longer
>>>> in active development (if supported at all).
>>>>
>>>> Secondly, why are these jars listed explicitly since they're part of
>>>> Spark? You should not really be doing such risky config changes (unless
>>>> you've got no other choice and you know what you're doing).
>>>>
>>>> Pozdrawiam,
>>>> Jacek Laskowski
>>>> ----
>>>> https://about.me/JacekLaskowski
>>>> "The Internals Of" Online Books <https://books.japila.pl/>
>>>> Follow me on https://twitter.com/jaceklaskowski
>>>>
>>>> <https://twitter.com/jaceklaskowski>
>>>>
>>>>
>>>> On Tue, Aug 31, 2021 at 1:00 PM Stelios Philippou <st...@gmail.com>
>>>> wrote:
>>>>
>>>>> Yes you are right.
>>>>> I am using Spring Boot for this.
>>>>>
>>>>> The same does work for the event that does not involve any kafka
>>>>> events. But again i am not sending out extra jars there so nothing is
>>>>> replaced and we are using the default ones.
>>>>>
>>>>> If i do not use the userClassPathFirst which will force the service to
>>>>> use the newer version i will end up with the same problem
>>>>>
>>>>> We are using protobuf v3+ and as such we need to push that version
>>>>> since apache core uses an older version.
>>>>>
>>>>> So all we should really need is the following : --jars
>>>>> "protobuf-java-3.17.3.jar" \
>>>>> and here we need the userClassPathFirst=true in order to use the
>>>>> latest version.
>>>>>
>>>>>
>>>>> Using only this jar as it works on local or no jars defined we
>>>>> ended up with the following error.
>>>>>
>>>>> 21/08/31 10:53:40 WARN  org.apache.spark.scheduler.TaskSetManager:
>>>>> Lost task 0.0 in stage 18.0 (TID 139) (10.60.63.56 executor 1):
>>>>> java.lang.ClassNotFoundException:
>>>>> org.apache.spark.streaming.kafka010.KafkaRDDPartition
>>>>>
>>>>> at java.base/java.net.URLClassLoader.findClass(Unknown Source)
>>>>>
>>>>> at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
>>>>>
>>>>> at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
>>>>>
>>>>> at java.base/java.lang.Class.forName0(Native Method)
>>>>>
>>>>> at java.base/java.lang.Class.forName(Unknown Source)
>>>>>
>>>>> at
>>>>> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> Which can be resolved with passing more jars.
>>>>>
>>>>>
>>>>> Any idea about this error ?
>>>>>
>>>>> K8 does not seem to like this, but Java Spring should be the one that
>>>>> is responsible for the version but it seems K8 does not like this versions.
>>>>>
>>>>> Perhaps miss configuration on K8 ?
>>>>>
>>>>> I haven't set that up so i am not aware of what was done there.
>>>>>
>>>>>
>>>>>
>>>>> For downgrading to java 8 on my K8 might not be so easy. I want to
>>>>> explore if there is something else before doing that as we will need to
>>>>> spin off new instances of K8 to check that.
>>>>>
>>>>>
>>>>>
>>>>> Thank you for the time taken
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Tue, 31 Aug 2021 at 12:26, Jacek Laskowski <ja...@japila.pl> wrote:
>>>>>
>>>>>> Hi Stelios,
>>>>>>
>>>>>> I've never seen this error before, but a couple of things caught
>>>>>> my attention that I would look at closer to chase the root cause of the
>>>>>> issue.
>>>>>>
>>>>>> "org.springframework.context.annotation.AnnotationConfigApplicationContext:"
>>>>>> and "21/08/31 07:28:42 ERROR  org.springframework.boot.SpringApplication:
>>>>>> Application run failed" seem to indicate that you're using Spring Boot
>>>>>> (that I know almost nothing about so take the following with a pinch of
>>>>>> salt :))
>>>>>>
>>>>>> Spring Boot manages the classpath by itself and together with another
>>>>>> interesting option in your
>>>>>> spark-submit, spark.driver.userClassPathFirst=true, makes me wonder how
>>>>>> much this exception:
>>>>>>
>>>>>> > org.apache.spark.scheduler.ExternalClusterManager:
>>>>>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
>>>>>> subtype
>>>>>>
>>>>>> could be due to casting compatible types from two different
>>>>>> classloaders?
>>>>>>
>>>>>> Just a thought but wanted to share as I think it's worth
>>>>>> investigating.
>>>>>>
>>>>>> Pozdrawiam,
>>>>>> Jacek Laskowski
>>>>>> ----
>>>>>> https://about.me/JacekLaskowski
>>>>>> "The Internals Of" Online Books <https://books.japila.pl/>
>>>>>> Follow me on https://twitter.com/jaceklaskowski
>>>>>>
>>>>>> <https://twitter.com/jaceklaskowski>
>>>>>>
>>>>>>
>>>>>> On Tue, Aug 31, 2021 at 9:44 AM Stelios Philippou <st...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> I have been facing the current issue for some time now and I was
>>>>>>> wondering if someone might have some inside on how I can resolve the
>>>>>>> following.
>>>>>>>
>>>>>>> The code (java 11) is working correctly on my local machine but
>>>>>>> whenever I try to launch the following on K8 I am getting the following
>>>>>>> error.
>>>>>>>
>>>>>>> 21/08/31 07:28:42 ERROR  org.apache.spark.SparkContext: Error
>>>>>>> initializing SparkContext.
>>>>>>>
>>>>>>> java.util.ServiceConfigurationError:
>>>>>>> org.apache.spark.scheduler.ExternalClusterManager:
>>>>>>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
>>>>>>> subtype
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> I have a spark that will monitor some directories and handle the
>>>>>>> data accordingly.
>>>>>>>
>>>>>>> That part is working correctly on K8 and the SparkContext has no
>>>>>>> issue being initialized there.
>>>>>>>
>>>>>>>
>>>>>>> This is the spark-submit for that
>>>>>>>
>>>>>>>
>>>>>>> spark-submit \
>>>>>>> --master=k8s://https://url:port \
>>>>>>> --deploy-mode cluster \
>>>>>>> --name a-name\
>>>>>>> --conf spark.driver.userClassPathFirst=true  \
>>>>>>> --conf spark.kubernetes.file.upload.path=hdfs://upload-path \
>>>>>>> --files "application-dev.properties,keystore.jks,truststore.jks"  \
>>>>>>> --conf spark.kubernetes.container.image=url/spark:spark-submit \
>>>>>>> --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
>>>>>>> --conf spark.kubernetes.namespace=spark \
>>>>>>> --conf spark.kubernetes.container.image.pullPolicy=Always \
>>>>>>> --conf spark.dynamicAllocation.enabled=false \
>>>>>>> --driver-memory 525m --executor-memory 525m \
>>>>>>> --num-executors 1 --executor-cores 1 \
>>>>>>> target/SparkStream.jar continuous-merge
>>>>>>>
>>>>>>>
>>>>>>> My issue comes when I try to launch the service in order to listen
>>>>>>> to kafka events and store them in HDFS.
>>>>>>>
>>>>>>>
>>>>>>> spark-submit \
>>>>>>> --master=k8s://https://url:port \
>>>>>>> --deploy-mode cluster \
>>>>>>> --name consume-data \
>>>>>>> --conf spark.driver.userClassPathFirst=true  \
>>>>>>> --conf spark.kubernetes.file.upload.path=hdfs://upload-path\
>>>>>>> --files "application-dev.properties,keystore.jks,truststore.jks"  \
>>>>>>> --jars "spark-yarn_2.12-3.1.2.jar,spark-core_2.12-3.1.2.jar,kafka-clients-2.8.0.jar,spark-streaming-kafka-0-10_2.12-3.1.2.jar,spark-token-provider-kafka-0-10_2.12-3.1.2.jar" \
>>>>>>> --conf spark.kubernetes.container.image=url/spark:spark-submit \
>>>>>>> --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
>>>>>>> --conf spark.kubernetes.authenticate.executor.serviceAccountName=spark \
>>>>>>> --conf spark.kubernetes.namespace=spark \
>>>>>>> --conf spark.kubernetes.container.image.pullPolicy=Always \
>>>>>>> --conf spark.dynamicAllocation.enabled=false \
>>>>>>> --driver-memory 1g --executor-memory 1g \
>>>>>>> --num-executors 1 --executor-cores 1 \
>>>>>>> target/SparkStream.jar consume
>>>>>>>
>>>>>>>
>>>>>>> It could be that I am launching the application wrongly or perhaps
>>>>>>> that my K8 is not configured correctly ?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> I have stripped down my code and left it barebone and will end up
>>>>>>> with the following issue :
>>>>>>>
>>>>>>>
>>>>>>> 21/08/31 07:28:42 ERROR  org.apache.spark.SparkContext: Error
>>>>>>> initializing SparkContext.
>>>>>>>
>>>>>>> java.util.ServiceConfigurationError:
>>>>>>> org.apache.spark.scheduler.ExternalClusterManager:
>>>>>>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
>>>>>>> subtype
>>>>>>>
>>>>>>> at java.base/java.util.ServiceLoader.fail(Unknown Source)
>>>>>>>
>>>>>>> at
>>>>>>> java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(Unknown
>>>>>>> Source)
>>>>>>>
>>>>>>> at
>>>>>>> java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(Unknown
>>>>>>> Source)
>>>>>>>
>>>>>>> at java.base/java.util.ServiceLoader$2.hasNext(Unknown Source)
>>>>>>>
>>>>>>> at java.base/java.util.ServiceLoader$3.hasNext(Unknown Source)
>>>>>>>
>>>>>>> at
>>>>>>> scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:
>>>>>>>
>>>>>>>
>>>>>>> 21/08/31 07:28:42 WARN  org.springframework.context.annotation.AnnotationConfigApplicationContext:
>>>>>>> Exception encountered during context initialization - cancelling refresh
>>>>>>> attempt: org.springframework.beans.factory.UnsatisfiedDependencyException:
>>>>>>> Error creating bean with name 'mainApplication': Unsatisfied dependency
>>>>>>> expressed through field 'streamAllKafkaData'; nested exception is
>>>>>>> org.springframework.beans.factory.UnsatisfiedDependencyException: Error
>>>>>>> creating bean with name 'streamAllKafkaData': Unsatisfied dependency
>>>>>>> expressed through field 'javaSparkContext'; nested exception is
>>>>>>> org.springframework.beans.factory.BeanCreationException: Error creating
>>>>>>> bean with name 'javaSparkContext' defined in class path resource
>>>>>>> [com/configuration/SparkConfiguration.class]: Bean instantiation via
>>>>>>> factory method failed; nested exception is
>>>>>>> org.springframework.beans.BeanInstantiationException: Failed to instantiate
>>>>>>> [org.apache.spark.api.java.JavaSparkContext]: Factory method
>>>>>>> 'javaSparkContext' threw exception; nested exception is
>>>>>>> java.util.ServiceConfigurationError:
>>>>>>> org.apache.spark.scheduler.ExternalClusterManager:
>>>>>>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
>>>>>>> subtype
>>>>>>>
>>>>>>> 21/08/31 07:28:42 ERROR  org.springframework.boot.SpringApplication:
>>>>>>> Application run failed
>>>>>>>
>>>>>>> org.springframework.beans.factory.UnsatisfiedDependencyException:
>>>>>>> Error creating bean with name 'mainApplication': Unsatisfied dependency
>>>>>>> expressed through field 'streamAllKafkaData'; nested exception is
>>>>>>> org.springframework.beans.factory.UnsatisfiedDependencyException: Error
>>>>>>> creating bean with name 'streamAllKafkaData': Unsatisfied dependency
>>>>>>> expressed through field 'javaSparkContext'; nested exception is
>>>>>>> org.springframework.beans.factory.BeanCreationException: Error creating
>>>>>>> bean with name 'javaSparkContext' defined in class path resource
>>>>>>> [com/configuration/SparkConfiguration.class]: Bean instantiation via
>>>>>>> factory method failed; nested exception is
>>>>>>> org.springframework.beans.BeanInstantiationException: Failed to instantiate
>>>>>>> [org.apache.spark.api.java.JavaSparkContext]: Factory method
>>>>>>> 'javaSparkContext' threw exception; nested exception is
>>>>>>> java.util.ServiceConfigurationError:
>>>>>>> org.apache.spark.scheduler.ExternalClusterManager:
>>>>>>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
>>>>>>> subtype
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> It could be that i am launching the application for Kafka wrongly
>>>>>>> with all the extra jars added ?
>>>>>>>
>>>>>>> Just that those seem to be needed or i am getting other errors when
>>>>>>> not including those.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Any help will be greatly appreciated.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Cheers,
>>>>>>>
>>>>>>> Stelios
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>

Re: Spark Stream on Kubernetes Cannot Set up JavaSparkContext

Posted by Stelios Philippou <st...@gmail.com>.
Yes on Local mode both from intelli and using spark-submit on my machine
and on a windows machine work.

I have noticed the following error when adding this in the above
spark-submit for k8

--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.1.2 \


:: resolving dependencies ::
org.apache.spark#spark-submit-parent-683eee8e-9409-49ea-b0a9-7cf871af7f0c;1.0

confs: [default]

Exception in thread "main" java.io.FileNotFoundException:
/opt/spark/.ivy2/cache/resolved-org.apache.spark-spark-submit-parent-683eee8e-9409-49ea-b0a9-7cf871af7f0c-1.0.xml
(No such file or directory)



is there some way to verify that the k8 installation is correct ?

Other spark processes that do not have streaming involved do work
correctly.

On Mon, 6 Sept 2021 at 16:03, Mich Talebzadeh <mi...@gmail.com>
wrote:

>
> Hi,
>
>
> Have you tried this on local mode as opposed to Kubernetes to see if it
> works?
>
>
> HTH
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 6 Sept 2021 at 11:16, Stelios Philippou <st...@gmail.com>
> wrote:
>
>> Hello Jacek,
>>
>> Yes this is a spark-streaming.
>>  I have removed all code and created a new project with just the base
>> code that is enough to open a stream and loop over it to see what i am
>> doing wrong.
>>
>> Not adding the packages would result me in the following error
>>
>> 21/09/06 08:10:41 WARN  org.apache.spark.scheduler.TaskSetManager: Lost
>> task 0.0 in stage 0.0 (TID 0) (10.60.60.128 executor 1):
>> java.lang.ClassNotFoundException:
>> org.apache.spark.streaming.kafka010.KafkaRDDPartition
>>
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>>
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
>>
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
>>
>> at java.lang.Class.forName0(Native Method)
>>
>> at java.lang.Class.forName(Class.java:348)
>>
>> at
>> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)
>>
>>
>> Which should not really be the case cause this should be included in the
>> kubernetes pod. Anyway I can confirm this ?
>>
>>
>> So my simple class is as follow :
>>
>>
>> streamingContext = new JavaStreamingContext(javaSparkContext, Durations.seconds(5));
>>
>> stream = KafkaUtils.createDirectStream(streamingContext, LocationStrategies.PreferConsistent(),
>>    ConsumerStrategies.Subscribe(topics, kafkaConfiguration));
>>
>> stream.foreachRDD((VoidFunction<JavaRDD<ConsumerRecord<String, byte[]>>>) rdd -> {
>>    try {
>>       rdd.foreachPartition(partition -> {
>>          while (partition.hasNext()) {
>>             ConsumerRecord<String, byte[]> consumerRecord = partition.next();
>>             LOGGER.info("WORKING " + consumerRecord.topic() +consumerRecord.partition() + ": "+consumerRecord.offset());
>>          }
>>       });
>>    } catch (Exception e) {
>>       e.printStackTrace();
>>    }
>> });
>>
>> streamingContext.start();
>> try {
>>    streamingContext.awaitTermination();
>> } catch (InterruptedException e) {
>>    e.printStackTrace();
>> } finally {
>>    streamingContext.stop();
>>    javaSparkContext.stop();
>> }
>>
>>
>> This is all there is too the class which is a java boot @Component.
>>
>> Now in order my pom is as such
>>
>> <?xml version="1.0" encoding="UTF-8"?>
>> <project xmlns="http://maven.apache.org/POM/4.0.0"
>>   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>>   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
>>   <modelVersion>4.0.0</modelVersion>
>>
>>   <groupId>com.kafka</groupId>
>>   <artifactId>SimpleKafkaStream</artifactId>
>>   <version>1.0</version>
>>
>>   <packaging>jar</packaging>
>>
>>   <properties>
>>     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
>>     <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
>>     <maven.compiler.source>8</maven.compiler.source>
>>     <maven.compiler.target>8</maven.compiler.target>
>>     <start-class>com.kafka.Main</start-class>
>>   </properties>
>>
>>   <parent>
>>     <groupId>org.springframework.boot</groupId>
>>     <artifactId>spring-boot-starter-parent</artifactId>
>>     <version>2.4.2</version>
>>     <relativePath/>
>>   </parent>
>>
>>   <dependencies>
>>     <dependency>
>>       <groupId>org.springframework.boot</groupId>
>>       <artifactId>spring-boot-starter</artifactId>
>>       <exclusions>
>>         <exclusion>
>>           <groupId>org.springframework.boot</groupId>
>>           <artifactId>spring-boot-starter-logging</artifactId>
>>         </exclusion>
>>       </exclusions>
>>     </dependency>
>>
>>     <dependency>
>>       <groupId>org.apache.spark</groupId>
>>       <artifactId>spark-core_2.12</artifactId>
>>       <version>3.1.2</version>
>>     </dependency>
>>
>>     <dependency>
>>       <groupId>org.apache.spark</groupId>
>>       <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
>>       <version>3.1.2</version>
>>       <scope>provided</scope>
>>     </dependency>
>>
>>     <dependency>
>>       <groupId>org.apache.spark</groupId>
>>       <artifactId>spark-streaming_2.12</artifactId>
>>       <version>3.1.2</version>
>>     </dependency>
>>
>>   </dependencies>
>>
>>   <build>
>>     <plugins>
>>       <plugin>
>>         <groupId>org.springframework.boot</groupId>
>>         <artifactId>spring-boot-maven-plugin</artifactId>
>>       </plugin>
>>
>>       <plugin>
>>         <groupId>org.apache.maven.plugins</groupId>
>>         <artifactId>maven-compiler-plugin</artifactId>
>>         <version>3.8.1</version>
>>         <configuration>
>>           <source>1.8</source>
>>           <target>1.8</target>
>>         </configuration>
>>       </plugin>
>>
>>     </plugins>
>>   </build>
>>
>> </project>
>>
>> a simple pom that even the  spark-streaming-kafka-0-10_2.12 scope is
>> provided or not it would stilly give the same error.
>>
>> I have tried to build an uber jar in order to test with that but i was
>> still unable to make it work as such :
>>
>> <build>
>>   <plugins>
>>     <plugin>
>>       <groupId>org.springframework.boot</groupId>
>>       <artifactId>spring-boot-maven-plugin</artifactId>
>>       <configuration>
>>         <fork>true</fork>
>>         <mainClass>com.kafka.Main</mainClass>
>>       </configuration>
>>       <executions>
>>         <execution>
>>           <goals>
>>             <goal>repackage</goal>
>>           </goals>
>>         </execution>
>>       </executions>
>>     </plugin>
>>     <plugin>
>>       <artifactId>maven-assembly-plugin</artifactId>
>>       <version>3.2.0</version>
>>       <configuration>
>>         <descriptorRefs>
>>           <descriptorRef>dependencies</descriptorRef>
>>         </descriptorRefs>
>>         <archive>
>>           <manifest>
>>             <addClasspath>true</addClasspath>
>>             <mainClass>com.kafka.Main</mainClass>
>>           </manifest>
>>         </archive>
>>       </configuration>
>>       <executions>
>>         <execution>
>>           <id>make-assembly</id>
>>           <phase>package</phase>
>>           <goals>
>>             <goal>single</goal>
>>           </goals>
>>         </execution>
>>       </executions>
>>     </plugin>
>>
>>     <plugin>
>>       <groupId>org.apache.maven.plugins</groupId>
>>       <artifactId>maven-compiler-plugin</artifactId>
>>       <version>3.8.1</version>
>>       <configuration>
>>         <source>1.8</source>
>>         <target>1.8</target>
>>       </configuration>
>>     </plugin>
>>
>>   </plugins>
>>
>> </build>
>>
>>  I am open to any suggestions and implementations in why this is not
>> working and what needs to be done.
>>
>>
>> Thank you for your time,
>>
>> Stelios
>>
>> On Sun, 5 Sept 2021 at 16:56, Jacek Laskowski <ja...@japila.pl> wrote:
>>
>>> Hi,
>>>
>>> No idea still, but noticed
>>> "org.apache.spark.streaming.kafka010.KafkaRDDPartition" and "--jars
>>> "spark-yarn_2.12-3.1.2.jar,spark-core_2.12-3.1.2.jar,kafka-clients-2.8.0.jar,spark-streaming-kafka-0-10_2.12-3.1.2.jar,spark-token-provider-kafka-0-10_2.12-3.1.2.jar"
>>> \" that bothers me quite a lot.
>>>
>>> First of all, it's a Spark Streaming (not Structured Streaming) app.
>>> Correct? Please upgrade at your earliest convenience since it's no longer
>>> in active development (if supported at all).
>>>
>>> Secondly, why are these jars listed explicitly since they're part of
>>> Spark? You should not really be doing such risky config changes (unless
>>> you've got no other choice and you know what you're doing).
>>>
>>> Pozdrawiam,
>>> Jacek Laskowski
>>> ----
>>> https://about.me/JacekLaskowski
>>> "The Internals Of" Online Books <https://books.japila.pl/>
>>> Follow me on https://twitter.com/jaceklaskowski
>>>
>>> <https://twitter.com/jaceklaskowski>
>>>
>>>
>>> On Tue, Aug 31, 2021 at 1:00 PM Stelios Philippou <st...@gmail.com>
>>> wrote:
>>>
>>>> Yes you are right.
>>>> I am using Spring Boot for this.
>>>>
>>>> The same does work for the event that does not involve any kafka
>>>> events. But again i am not sending out extra jars there so nothing is
>>>> replaced and we are using the default ones.
>>>>
>>>> If i do not use the userClassPathFirst which will force the service to
>>>> use the newer version i will end up with the same problem
>>>>
>>>> We are using protobuf v3+ and as such we need to push that version
>>>> since apache core uses an older version.
>>>>
>>>> So all we should really need is the following : --jars
>>>> "protobuf-java-3.17.3.jar" \
>>>> and here we need the userClassPathFirst=true in order to use the latest
>>>> version.
>>>>
>>>>
>>>> Using only this jar as it works on local or no jars defined we ended up
>>>> with the following error.
>>>>
>>>> 21/08/31 10:53:40 WARN  org.apache.spark.scheduler.TaskSetManager:
>>>> Lost task 0.0 in stage 18.0 (TID 139) (10.60.63.56 executor 1):
>>>> java.lang.ClassNotFoundException:
>>>> org.apache.spark.streaming.kafka010.KafkaRDDPartition
>>>>
>>>> at java.base/java.net.URLClassLoader.findClass(Unknown Source)
>>>>
>>>> at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
>>>>
>>>> at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
>>>>
>>>> at java.base/java.lang.Class.forName0(Native Method)
>>>>
>>>> at java.base/java.lang.Class.forName(Unknown Source)
>>>>
>>>> at
>>>> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)
>>>>
>>>>
>>>>
>>>>
>>>> Which can be resolved with passing more jars.
>>>>
>>>>
>>>> Any idea about this error ?
>>>>
>>>> K8 does not seem to like this, but Java Spring should be the one that
>>>> is responsible for the version but it seems K8 does not like this versions.
>>>>
>>>> Perhaps miss configuration on K8 ?
>>>>
>>>> I haven't set that up so i am not aware of what was done there.
>>>>
>>>>
>>>>
>>>> For downgrading to java 8 on my K8 might not be so easy. I want to
>>>> explore if there is something else before doing that as we will need to
>>>> spin off new instances of K8 to check that.
>>>>
>>>>
>>>>
>>>> Thank you for the time taken
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, 31 Aug 2021 at 12:26, Jacek Laskowski <ja...@japila.pl> wrote:
>>>>
>>>>> Hi Stelios,
>>>>>
>>>>> I've never seen this error before, but a couple of things caught
>>>>> my attention that I would look at closer to chase the root cause of the
>>>>> issue.
>>>>>
>>>>> "org.springframework.context.annotation.AnnotationConfigApplicationContext:"
>>>>> and "21/08/31 07:28:42 ERROR  org.springframework.boot.SpringApplication:
>>>>> Application run failed" seem to indicate that you're using Spring Boot
>>>>> (that I know almost nothing about so take the following with a pinch of
>>>>> salt :))
>>>>>
>>>>> Spring Boot manages the classpath by itself and together with another
>>>>> interesting option in your
>>>>> spark-submit, spark.driver.userClassPathFirst=true, makes me wonder how
>>>>> much this exception:
>>>>>
>>>>> > org.apache.spark.scheduler.ExternalClusterManager:
>>>>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
>>>>> subtype
>>>>>
>>>>> could be due to casting compatible types from two different
>>>>> classloaders?
>>>>>
>>>>> Just a thought but wanted to share as I think it's worth investigating.
>>>>>
>>>>> Pozdrawiam,
>>>>> Jacek Laskowski
>>>>> ----
>>>>> https://about.me/JacekLaskowski
>>>>> "The Internals Of" Online Books <https://books.japila.pl/>
>>>>> Follow me on https://twitter.com/jaceklaskowski
>>>>>
>>>>> <https://twitter.com/jaceklaskowski>
>>>>>
>>>>>
>>>>> On Tue, Aug 31, 2021 at 9:44 AM Stelios Philippou <st...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> I have been facing the current issue for some time now and I was
>>>>>> wondering if someone might have some inside on how I can resolve the
>>>>>> following.
>>>>>>
>>>>>> The code (java 11) is working correctly on my local machine but
>>>>>> whenever I try to launch the following on K8 I am getting the following
>>>>>> error.
>>>>>>
>>>>>> 21/08/31 07:28:42 ERROR  org.apache.spark.SparkContext: Error
>>>>>> initializing SparkContext.
>>>>>>
>>>>>> java.util.ServiceConfigurationError:
>>>>>> org.apache.spark.scheduler.ExternalClusterManager:
>>>>>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
>>>>>> subtype
>>>>>>
>>>>>>
>>>>>>
>>>>>> I have a spark that will monitor some directories and handle the data
>>>>>> accordingly.
>>>>>>
>>>>>> That part is working correctly on K8 and the SparkContext has no
>>>>>> issue being initialized there.
>>>>>>
>>>>>>
>>>>>> This is the spark-submit for that
>>>>>>
>>>>>>
>>>>>> spark-submit \
>>>>>> --master=k8s://https://url:port \
>>>>>> --deploy-mode cluster \
>>>>>> --name a-name\
>>>>>> --conf spark.driver.userClassPathFirst=true  \
>>>>>> --conf spark.kubernetes.file.upload.path=hdfs://upload-path \
>>>>>> --files "application-dev.properties,keystore.jks,truststore.jks"  \
>>>>>> --conf spark.kubernetes.container.image=url/spark:spark-submit \
>>>>>> --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
>>>>>> --conf spark.kubernetes.namespace=spark \
>>>>>> --conf spark.kubernetes.container.image.pullPolicy=Always \
>>>>>> --conf spark.dynamicAllocation.enabled=false \
>>>>>> --driver-memory 525m --executor-memory 525m \
>>>>>> --num-executors 1 --executor-cores 1 \
>>>>>> target/SparkStream.jar continuous-merge
>>>>>>
>>>>>>
>>>>>> My issue comes when I try to launch the service in order to listen to
>>>>>> kafka events and store them in HDFS.
>>>>>>
>>>>>>
>>>>>> spark-submit \
>>>>>> --master=k8s://https://url:port \
>>>>>> --deploy-mode cluster \
>>>>>> --name consume-data \
>>>>>> --conf spark.driver.userClassPathFirst=true  \
>>>>>> --conf spark.kubernetes.file.upload.path=hdfs://upload-path\
>>>>>> --files "application-dev.properties,keystore.jks,truststore.jks"  \
>>>>>> --jars "spark-yarn_2.12-3.1.2.jar,spark-core_2.12-3.1.2.jar,kafka-clients-2.8.0.jar,spark-streaming-kafka-0-10_2.12-3.1.2.jar,spark-token-provider-kafka-0-10_2.12-3.1.2.jar" \
>>>>>> --conf spark.kubernetes.container.image=url/spark:spark-submit \
>>>>>> --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
>>>>>> --conf spark.kubernetes.authenticate.executor.serviceAccountName=spark \
>>>>>> --conf spark.kubernetes.namespace=spark \
>>>>>> --conf spark.kubernetes.container.image.pullPolicy=Always \
>>>>>> --conf spark.dynamicAllocation.enabled=false \
>>>>>> --driver-memory 1g --executor-memory 1g \
>>>>>> --num-executors 1 --executor-cores 1 \
>>>>>> target/SparkStream.jar consume
>>>>>>
>>>>>>
>>>>>> It could be that I am launching the application wrongly or perhaps
>>>>>> that my K8 is not configured correctly ?
>>>>>>
>>>>>>
>>>>>>
>>>>>> I have stripped down my code and left it barebone and will end up
>>>>>> with the following issue :
>>>>>>
>>>>>>
>>>>>> 21/08/31 07:28:42 ERROR  org.apache.spark.SparkContext: Error
>>>>>> initializing SparkContext.
>>>>>>
>>>>>> java.util.ServiceConfigurationError:
>>>>>> org.apache.spark.scheduler.ExternalClusterManager:
>>>>>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
>>>>>> subtype
>>>>>>
>>>>>> at java.base/java.util.ServiceLoader.fail(Unknown Source)
>>>>>>
>>>>>> at
>>>>>> java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(Unknown
>>>>>> Source)
>>>>>>
>>>>>> at
>>>>>> java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(Unknown
>>>>>> Source)
>>>>>>
>>>>>> at java.base/java.util.ServiceLoader$2.hasNext(Unknown Source)
>>>>>>
>>>>>> at java.base/java.util.ServiceLoader$3.hasNext(Unknown Source)
>>>>>>
>>>>>> at
>>>>>> scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:
>>>>>>
>>>>>>
>>>>>> 21/08/31 07:28:42 WARN  org.springframework.context.annotation.AnnotationConfigApplicationContext:
>>>>>> Exception encountered during context initialization - cancelling refresh
>>>>>> attempt: org.springframework.beans.factory.UnsatisfiedDependencyException:
>>>>>> Error creating bean with name 'mainApplication': Unsatisfied dependency
>>>>>> expressed through field 'streamAllKafkaData'; nested exception is
>>>>>> org.springframework.beans.factory.UnsatisfiedDependencyException: Error
>>>>>> creating bean with name 'streamAllKafkaData': Unsatisfied dependency
>>>>>> expressed through field 'javaSparkContext'; nested exception is
>>>>>> org.springframework.beans.factory.BeanCreationException: Error creating
>>>>>> bean with name 'javaSparkContext' defined in class path resource
>>>>>> [com/configuration/SparkConfiguration.class]: Bean instantiation via
>>>>>> factory method failed; nested exception is
>>>>>> org.springframework.beans.BeanInstantiationException: Failed to instantiate
>>>>>> [org.apache.spark.api.java.JavaSparkContext]: Factory method
>>>>>> 'javaSparkContext' threw exception; nested exception is
>>>>>> java.util.ServiceConfigurationError:
>>>>>> org.apache.spark.scheduler.ExternalClusterManager:
>>>>>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
>>>>>> subtype
>>>>>>
>>>>>> 21/08/31 07:28:42 ERROR  org.springframework.boot.SpringApplication:
>>>>>> Application run failed
>>>>>>
>>>>>> org.springframework.beans.factory.UnsatisfiedDependencyException:
>>>>>> Error creating bean with name 'mainApplication': Unsatisfied dependency
>>>>>> expressed through field 'streamAllKafkaData'; nested exception is
>>>>>> org.springframework.beans.factory.UnsatisfiedDependencyException: Error
>>>>>> creating bean with name 'streamAllKafkaData': Unsatisfied dependency
>>>>>> expressed through field 'javaSparkContext'; nested exception is
>>>>>> org.springframework.beans.factory.BeanCreationException: Error creating
>>>>>> bean with name 'javaSparkContext' defined in class path resource
>>>>>> [com/configuration/SparkConfiguration.class]: Bean instantiation via
>>>>>> factory method failed; nested exception is
>>>>>> org.springframework.beans.BeanInstantiationException: Failed to instantiate
>>>>>> [org.apache.spark.api.java.JavaSparkContext]: Factory method
>>>>>> 'javaSparkContext' threw exception; nested exception is
>>>>>> java.util.ServiceConfigurationError:
>>>>>> org.apache.spark.scheduler.ExternalClusterManager:
>>>>>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
>>>>>> subtype
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> It could be that i am launching the application for Kafka wrongly
>>>>>> with all the extra jars added ?
>>>>>>
>>>>>> Just that those seem to be needed or i am getting other errors when
>>>>>> not including those.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> Any help will be greatly appreciated.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Cheers,
>>>>>>
>>>>>> Stelios
>>>>>>
>>>>>>
>>>>>>
>>>>>>

Re: Spark Stream on Kubernetes Cannot Set up JavaSparkContext

Posted by Mich Talebzadeh <mi...@gmail.com>.
Hi,


Have you tried this on local mode as opposed to Kubernetes to see if it
works?


HTH


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 6 Sept 2021 at 11:16, Stelios Philippou <st...@gmail.com> wrote:

> Hello Jacek,
>
> Yes this is a spark-streaming.
>  I have removed all code and created a new project with just the base code
> that is enough to open a stream and loop over it to see what i am doing
> wrong.
>
> Not adding the packages would result me in the following error
>
> 21/09/06 08:10:41 WARN  org.apache.spark.scheduler.TaskSetManager: Lost
> task 0.0 in stage 0.0 (TID 0) (10.60.60.128 executor 1):
> java.lang.ClassNotFoundException:
> org.apache.spark.streaming.kafka010.KafkaRDDPartition
>
> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
>
> at java.lang.Class.forName0(Native Method)
>
> at java.lang.Class.forName(Class.java:348)
>
> at
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)
>
>
> Which should not really be the case cause this should be included in the
> kubernetes pod. Anyway I can confirm this ?
>
>
> So my simple class is as follow :
>
>
> streamingContext = new JavaStreamingContext(javaSparkContext, Durations.seconds(5));
>
> stream = KafkaUtils.createDirectStream(streamingContext, LocationStrategies.PreferConsistent(),
>    ConsumerStrategies.Subscribe(topics, kafkaConfiguration));
>
> stream.foreachRDD((VoidFunction<JavaRDD<ConsumerRecord<String, byte[]>>>) rdd -> {
>    try {
>       rdd.foreachPartition(partition -> {
>          while (partition.hasNext()) {
>             ConsumerRecord<String, byte[]> consumerRecord = partition.next();
>             LOGGER.info("WORKING " + consumerRecord.topic() +consumerRecord.partition() + ": "+consumerRecord.offset());
>          }
>       });
>    } catch (Exception e) {
>       e.printStackTrace();
>    }
> });
>
> streamingContext.start();
> try {
>    streamingContext.awaitTermination();
> } catch (InterruptedException e) {
>    e.printStackTrace();
> } finally {
>    streamingContext.stop();
>    javaSparkContext.stop();
> }
>
>
> This is all there is too the class which is a java boot @Component.
>
> Now in order my pom is as such
>
> <?xml version="1.0" encoding="UTF-8"?>
> <project xmlns="http://maven.apache.org/POM/4.0.0"
>   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
>   <modelVersion>4.0.0</modelVersion>
>
>   <groupId>com.kafka</groupId>
>   <artifactId>SimpleKafkaStream</artifactId>
>   <version>1.0</version>
>
>   <packaging>jar</packaging>
>
>   <properties>
>     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
>     <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
>     <maven.compiler.source>8</maven.compiler.source>
>     <maven.compiler.target>8</maven.compiler.target>
>     <start-class>com.kafka.Main</start-class>
>   </properties>
>
>   <parent>
>     <groupId>org.springframework.boot</groupId>
>     <artifactId>spring-boot-starter-parent</artifactId>
>     <version>2.4.2</version>
>     <relativePath/>
>   </parent>
>
>   <dependencies>
>     <dependency>
>       <groupId>org.springframework.boot</groupId>
>       <artifactId>spring-boot-starter</artifactId>
>       <exclusions>
>         <exclusion>
>           <groupId>org.springframework.boot</groupId>
>           <artifactId>spring-boot-starter-logging</artifactId>
>         </exclusion>
>       </exclusions>
>     </dependency>
>
>     <dependency>
>       <groupId>org.apache.spark</groupId>
>       <artifactId>spark-core_2.12</artifactId>
>       <version>3.1.2</version>
>     </dependency>
>
>     <dependency>
>       <groupId>org.apache.spark</groupId>
>       <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
>       <version>3.1.2</version>
>       <scope>provided</scope>
>     </dependency>
>
>     <dependency>
>       <groupId>org.apache.spark</groupId>
>       <artifactId>spark-streaming_2.12</artifactId>
>       <version>3.1.2</version>
>     </dependency>
>
>   </dependencies>
>
>   <build>
>     <plugins>
>       <plugin>
>         <groupId>org.springframework.boot</groupId>
>         <artifactId>spring-boot-maven-plugin</artifactId>
>       </plugin>
>
>       <plugin>
>         <groupId>org.apache.maven.plugins</groupId>
>         <artifactId>maven-compiler-plugin</artifactId>
>         <version>3.8.1</version>
>         <configuration>
>           <source>1.8</source>
>           <target>1.8</target>
>         </configuration>
>       </plugin>
>
>     </plugins>
>   </build>
>
> </project>
>
> a simple pom that even the  spark-streaming-kafka-0-10_2.12 scope is
> provided or not it would stilly give the same error.
>
> I have tried to build an uber jar in order to test with that but i was
> still unable to make it work as such :
>
> <build>
>   <plugins>
>     <plugin>
>       <groupId>org.springframework.boot</groupId>
>       <artifactId>spring-boot-maven-plugin</artifactId>
>       <configuration>
>         <fork>true</fork>
>         <mainClass>com.kafka.Main</mainClass>
>       </configuration>
>       <executions>
>         <execution>
>           <goals>
>             <goal>repackage</goal>
>           </goals>
>         </execution>
>       </executions>
>     </plugin>
>     <plugin>
>       <artifactId>maven-assembly-plugin</artifactId>
>       <version>3.2.0</version>
>       <configuration>
>         <descriptorRefs>
>           <descriptorRef>dependencies</descriptorRef>
>         </descriptorRefs>
>         <archive>
>           <manifest>
>             <addClasspath>true</addClasspath>
>             <mainClass>com.kafka.Main</mainClass>
>           </manifest>
>         </archive>
>       </configuration>
>       <executions>
>         <execution>
>           <id>make-assembly</id>
>           <phase>package</phase>
>           <goals>
>             <goal>single</goal>
>           </goals>
>         </execution>
>       </executions>
>     </plugin>
>
>     <plugin>
>       <groupId>org.apache.maven.plugins</groupId>
>       <artifactId>maven-compiler-plugin</artifactId>
>       <version>3.8.1</version>
>       <configuration>
>         <source>1.8</source>
>         <target>1.8</target>
>       </configuration>
>     </plugin>
>
>   </plugins>
>
> </build>
>
>  I am open to any suggestions and implementations in why this is not
> working and what needs to be done.
>
>
> Thank you for your time,
>
> Stelios
>
> On Sun, 5 Sept 2021 at 16:56, Jacek Laskowski <ja...@japila.pl> wrote:
>
>> Hi,
>>
>> No idea still, but noticed
>> "org.apache.spark.streaming.kafka010.KafkaRDDPartition" and "--jars
>> "spark-yarn_2.12-3.1.2.jar,spark-core_2.12-3.1.2.jar,kafka-clients-2.8.0.jar,spark-streaming-kafka-0-10_2.12-3.1.2.jar,spark-token-provider-kafka-0-10_2.12-3.1.2.jar"
>> \" that bothers me quite a lot.
>>
>> First of all, it's a Spark Streaming (not Structured Streaming) app.
>> Correct? Please upgrade at your earliest convenience since it's no longer
>> in active development (if supported at all).
>>
>> Secondly, why are these jars listed explicitly since they're part of
>> Spark? You should not really be doing such risky config changes (unless
>> you've got no other choice and you know what you're doing).
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> ----
>> https://about.me/JacekLaskowski
>> "The Internals Of" Online Books <https://books.japila.pl/>
>> Follow me on https://twitter.com/jaceklaskowski
>>
>> <https://twitter.com/jaceklaskowski>
>>
>>
>> On Tue, Aug 31, 2021 at 1:00 PM Stelios Philippou <st...@gmail.com>
>> wrote:
>>
>>> Yes you are right.
>>> I am using Spring Boot for this.
>>>
>>> The same does work for the event that does not involve any kafka events.
>>> But again i am not sending out extra jars there so nothing is replaced and
>>> we are using the default ones.
>>>
>>> If i do not use the userClassPathFirst which will force the service to
>>> use the newer version i will end up with the same problem
>>>
>>> We are using protobuf v3+ and as such we need to push that version since
>>> apache core uses an older version.
>>>
>>> So all we should really need is the following : --jars
>>> "protobuf-java-3.17.3.jar" \
>>> and here we need the userClassPathFirst=true in order to use the latest
>>> version.
>>>
>>>
>>> Using only this jar as it works on local or no jars defined we ended up
>>> with the following error.
>>>
>>> 21/08/31 10:53:40 WARN  org.apache.spark.scheduler.TaskSetManager: Lost
>>> task 0.0 in stage 18.0 (TID 139) (10.60.63.56 executor 1):
>>> java.lang.ClassNotFoundException:
>>> org.apache.spark.streaming.kafka010.KafkaRDDPartition
>>>
>>> at java.base/java.net.URLClassLoader.findClass(Unknown Source)
>>>
>>> at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
>>>
>>> at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
>>>
>>> at java.base/java.lang.Class.forName0(Native Method)
>>>
>>> at java.base/java.lang.Class.forName(Unknown Source)
>>>
>>> at
>>> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)
>>>
>>>
>>>
>>>
>>> Which can be resolved with passing more jars.
>>>
>>>
>>> Any idea about this error ?
>>>
>>> K8 does not seem to like this, but Java Spring should be the one that is
>>> responsible for the version but it seems K8 does not like this versions.
>>>
>>> Perhaps miss configuration on K8 ?
>>>
>>> I haven't set that up so i am not aware of what was done there.
>>>
>>>
>>>
>>> For downgrading to java 8 on my K8 might not be so easy. I want to
>>> explore if there is something else before doing that as we will need to
>>> spin off new instances of K8 to check that.
>>>
>>>
>>>
>>> Thank you for the time taken
>>>
>>>
>>>
>>>
>>> On Tue, 31 Aug 2021 at 12:26, Jacek Laskowski <ja...@japila.pl> wrote:
>>>
>>>> Hi Stelios,
>>>>
>>>> I've never seen this error before, but a couple of things caught
>>>> my attention that I would look at closer to chase the root cause of the
>>>> issue.
>>>>
>>>> "org.springframework.context.annotation.AnnotationConfigApplicationContext:"
>>>> and "21/08/31 07:28:42 ERROR  org.springframework.boot.SpringApplication:
>>>> Application run failed" seem to indicate that you're using Spring Boot
>>>> (that I know almost nothing about so take the following with a pinch of
>>>> salt :))
>>>>
>>>> Spring Boot manages the classpath by itself and together with another
>>>> interesting option in your
>>>> spark-submit, spark.driver.userClassPathFirst=true, makes me wonder how
>>>> much this exception:
>>>>
>>>> > org.apache.spark.scheduler.ExternalClusterManager:
>>>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
>>>> subtype
>>>>
>>>> could be due to casting compatible types from two different
>>>> classloaders?
>>>>
>>>> Just a thought but wanted to share as I think it's worth investigating.
>>>>
>>>> Pozdrawiam,
>>>> Jacek Laskowski
>>>> ----
>>>> https://about.me/JacekLaskowski
>>>> "The Internals Of" Online Books <https://books.japila.pl/>
>>>> Follow me on https://twitter.com/jaceklaskowski
>>>>
>>>> <https://twitter.com/jaceklaskowski>
>>>>
>>>>
>>>> On Tue, Aug 31, 2021 at 9:44 AM Stelios Philippou <st...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I have been facing the current issue for some time now and I was
>>>>> wondering if someone might have some inside on how I can resolve the
>>>>> following.
>>>>>
>>>>> The code (java 11) is working correctly on my local machine but
>>>>> whenever I try to launch the following on K8 I am getting the following
>>>>> error.
>>>>>
>>>>> 21/08/31 07:28:42 ERROR  org.apache.spark.SparkContext: Error
>>>>> initializing SparkContext.
>>>>>
>>>>> java.util.ServiceConfigurationError:
>>>>> org.apache.spark.scheduler.ExternalClusterManager:
>>>>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
>>>>> subtype
>>>>>
>>>>>
>>>>>
>>>>> I have a spark that will monitor some directories and handle the data
>>>>> accordingly.
>>>>>
>>>>> That part is working correctly on K8 and the SparkContext has no issue
>>>>> being initialized there.
>>>>>
>>>>>
>>>>> This is the spark-submit for that
>>>>>
>>>>>
>>>>> spark-submit \
>>>>> --master=k8s://https://url:port \
>>>>> --deploy-mode cluster \
>>>>> --name a-name\
>>>>> --conf spark.driver.userClassPathFirst=true  \
>>>>> --conf spark.kubernetes.file.upload.path=hdfs://upload-path \
>>>>> --files "application-dev.properties,keystore.jks,truststore.jks"  \
>>>>> --conf spark.kubernetes.container.image=url/spark:spark-submit \
>>>>> --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
>>>>> --conf spark.kubernetes.namespace=spark \
>>>>> --conf spark.kubernetes.container.image.pullPolicy=Always \
>>>>> --conf spark.dynamicAllocation.enabled=false \
>>>>> --driver-memory 525m --executor-memory 525m \
>>>>> --num-executors 1 --executor-cores 1 \
>>>>> target/SparkStream.jar continuous-merge
>>>>>
>>>>>
>>>>> My issue comes when I try to launch the service in order to listen to
>>>>> kafka events and store them in HDFS.
>>>>>
>>>>>
>>>>> spark-submit \
>>>>> --master=k8s://https://url:port \
>>>>> --deploy-mode cluster \
>>>>> --name consume-data \
>>>>> --conf spark.driver.userClassPathFirst=true  \
>>>>> --conf spark.kubernetes.file.upload.path=hdfs://upload-path\
>>>>> --files "application-dev.properties,keystore.jks,truststore.jks"  \
>>>>> --jars "spark-yarn_2.12-3.1.2.jar,spark-core_2.12-3.1.2.jar,kafka-clients-2.8.0.jar,spark-streaming-kafka-0-10_2.12-3.1.2.jar,spark-token-provider-kafka-0-10_2.12-3.1.2.jar" \
>>>>> --conf spark.kubernetes.container.image=url/spark:spark-submit \
>>>>> --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
>>>>> --conf spark.kubernetes.authenticate.executor.serviceAccountName=spark \
>>>>> --conf spark.kubernetes.namespace=spark \
>>>>> --conf spark.kubernetes.container.image.pullPolicy=Always \
>>>>> --conf spark.dynamicAllocation.enabled=false \
>>>>> --driver-memory 1g --executor-memory 1g \
>>>>> --num-executors 1 --executor-cores 1 \
>>>>> target/SparkStream.jar consume
>>>>>
>>>>>
>>>>> It could be that I am launching the application wrongly or perhaps
>>>>> that my K8 is not configured correctly ?
>>>>>
>>>>>
>>>>>
>>>>> I have stripped down my code and left it barebone and will end up with
>>>>> the following issue :
>>>>>
>>>>>
>>>>> 21/08/31 07:28:42 ERROR  org.apache.spark.SparkContext: Error
>>>>> initializing SparkContext.
>>>>>
>>>>> java.util.ServiceConfigurationError:
>>>>> org.apache.spark.scheduler.ExternalClusterManager:
>>>>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
>>>>> subtype
>>>>>
>>>>> at java.base/java.util.ServiceLoader.fail(Unknown Source)
>>>>>
>>>>> at
>>>>> java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(Unknown
>>>>> Source)
>>>>>
>>>>> at
>>>>> java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(Unknown
>>>>> Source)
>>>>>
>>>>> at java.base/java.util.ServiceLoader$2.hasNext(Unknown Source)
>>>>>
>>>>> at java.base/java.util.ServiceLoader$3.hasNext(Unknown Source)
>>>>>
>>>>> at
>>>>> scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:
>>>>>
>>>>>
>>>>> 21/08/31 07:28:42 WARN  org.springframework.context.annotation.AnnotationConfigApplicationContext:
>>>>> Exception encountered during context initialization - cancelling refresh
>>>>> attempt: org.springframework.beans.factory.UnsatisfiedDependencyException:
>>>>> Error creating bean with name 'mainApplication': Unsatisfied dependency
>>>>> expressed through field 'streamAllKafkaData'; nested exception is
>>>>> org.springframework.beans.factory.UnsatisfiedDependencyException: Error
>>>>> creating bean with name 'streamAllKafkaData': Unsatisfied dependency
>>>>> expressed through field 'javaSparkContext'; nested exception is
>>>>> org.springframework.beans.factory.BeanCreationException: Error creating
>>>>> bean with name 'javaSparkContext' defined in class path resource
>>>>> [com/configuration/SparkConfiguration.class]: Bean instantiation via
>>>>> factory method failed; nested exception is
>>>>> org.springframework.beans.BeanInstantiationException: Failed to instantiate
>>>>> [org.apache.spark.api.java.JavaSparkContext]: Factory method
>>>>> 'javaSparkContext' threw exception; nested exception is
>>>>> java.util.ServiceConfigurationError:
>>>>> org.apache.spark.scheduler.ExternalClusterManager:
>>>>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
>>>>> subtype
>>>>>
>>>>> 21/08/31 07:28:42 ERROR  org.springframework.boot.SpringApplication:
>>>>> Application run failed
>>>>>
>>>>> org.springframework.beans.factory.UnsatisfiedDependencyException:
>>>>> Error creating bean with name 'mainApplication': Unsatisfied dependency
>>>>> expressed through field 'streamAllKafkaData'; nested exception is
>>>>> org.springframework.beans.factory.UnsatisfiedDependencyException: Error
>>>>> creating bean with name 'streamAllKafkaData': Unsatisfied dependency
>>>>> expressed through field 'javaSparkContext'; nested exception is
>>>>> org.springframework.beans.factory.BeanCreationException: Error creating
>>>>> bean with name 'javaSparkContext' defined in class path resource
>>>>> [com/configuration/SparkConfiguration.class]: Bean instantiation via
>>>>> factory method failed; nested exception is
>>>>> org.springframework.beans.BeanInstantiationException: Failed to instantiate
>>>>> [org.apache.spark.api.java.JavaSparkContext]: Factory method
>>>>> 'javaSparkContext' threw exception; nested exception is
>>>>> java.util.ServiceConfigurationError:
>>>>> org.apache.spark.scheduler.ExternalClusterManager:
>>>>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
>>>>> subtype
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> It could be that i am launching the application for Kafka wrongly with
>>>>> all the extra jars added ?
>>>>>
>>>>> Just that those seem to be needed or i am getting other errors when
>>>>> not including those.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> Any help will be greatly appreciated.
>>>>>
>>>>>
>>>>>
>>>>> Cheers,
>>>>>
>>>>> Stelios
>>>>>
>>>>>
>>>>>
>>>>>

Re: Spark Stream on Kubernetes Cannot Set up JavaSparkContext

Posted by Stelios Philippou <st...@gmail.com>.
Hello Jacek,

Yes this is a spark-streaming.
 I have removed all code and created a new project with just the base code
that is enough to open a stream and loop over it to see what i am doing
wrong.

Not adding the packages would result me in the following error

21/09/06 08:10:41 WARN  org.apache.spark.scheduler.TaskSetManager: Lost
task 0.0 in stage 0.0 (TID 0) (10.60.60.128 executor 1):
java.lang.ClassNotFoundException:
org.apache.spark.streaming.kafka010.KafkaRDDPartition

at java.net.URLClassLoader.findClass(URLClassLoader.java:382)

at java.lang.ClassLoader.loadClass(ClassLoader.java:418)

at java.lang.ClassLoader.loadClass(ClassLoader.java:351)

at java.lang.Class.forName0(Native Method)

at java.lang.Class.forName(Class.java:348)

at
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)


Which should not really be the case cause this should be included in the
kubernetes pod. Anyway I can confirm this ?


So my simple class is as follow :


streamingContext = new JavaStreamingContext(javaSparkContext,
Durations.seconds(5));

stream = KafkaUtils.createDirectStream(streamingContext,
LocationStrategies.PreferConsistent(),
   ConsumerStrategies.Subscribe(topics, kafkaConfiguration));

stream.foreachRDD((VoidFunction<JavaRDD<ConsumerRecord<String,
byte[]>>>) rdd -> {
   try {
      rdd.foreachPartition(partition -> {
         while (partition.hasNext()) {
            ConsumerRecord<String, byte[]> consumerRecord = partition.next();
            LOGGER.info("WORKING " + consumerRecord.topic()
+consumerRecord.partition() + ": "+consumerRecord.offset());
         }
      });
   } catch (Exception e) {
      e.printStackTrace();
   }
});

streamingContext.start();
try {
   streamingContext.awaitTermination();
} catch (InterruptedException e) {
   e.printStackTrace();
} finally {
   streamingContext.stop();
   javaSparkContext.stop();
}


This is all there is too the class which is a java boot @Component.

Now in order my pom is as such

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.kafka</groupId>
  <artifactId>SimpleKafkaStream</artifactId>
  <version>1.0</version>

  <packaging>jar</packaging>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <start-class>com.kafka.Main</start-class>
  </properties>

  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.4.2</version>
    <relativePath/>
  </parent>

  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter</artifactId>
      <exclusions>
        <exclusion>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-logging</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.12</artifactId>
      <version>3.1.2</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
      <version>3.1.2</version>
      <scope>provided</scope>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.12</artifactId>
      <version>3.1.2</version>
    </dependency>

  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-maven-plugin</artifactId>
      </plugin>

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.8.1</version>
        <configuration>
          <source>1.8</source>
          <target>1.8</target>
        </configuration>
      </plugin>

    </plugins>
  </build>

</project>

a simple pom that even the  spark-streaming-kafka-0-10_2.12 scope is
provided or not it would stilly give the same error.

I have tried to build an uber jar in order to test with that but i was
still unable to make it work as such :

<build>
  <plugins>
    <plugin>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-maven-plugin</artifactId>
      <configuration>
        <fork>true</fork>
        <mainClass>com.kafka.Main</mainClass>
      </configuration>
      <executions>
        <execution>
          <goals>
            <goal>repackage</goal>
          </goals>
        </execution>
      </executions>
    </plugin>
    <plugin>
      <artifactId>maven-assembly-plugin</artifactId>
      <version>3.2.0</version>
      <configuration>
        <descriptorRefs>
          <descriptorRef>dependencies</descriptorRef>
        </descriptorRefs>
        <archive>
          <manifest>
            <addClasspath>true</addClasspath>
            <mainClass>com.kafka.Main</mainClass>
          </manifest>
        </archive>
      </configuration>
      <executions>
        <execution>
          <id>make-assembly</id>
          <phase>package</phase>
          <goals>
            <goal>single</goal>
          </goals>
        </execution>
      </executions>
    </plugin>

    <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-compiler-plugin</artifactId>
      <version>3.8.1</version>
      <configuration>
        <source>1.8</source>
        <target>1.8</target>
      </configuration>
    </plugin>

  </plugins>

</build>

 I am open to any suggestions and implementations in why this is not
working and what needs to be done.


Thank you for your time,

Stelios

On Sun, 5 Sept 2021 at 16:56, Jacek Laskowski <ja...@japila.pl> wrote:

> Hi,
>
> No idea still, but noticed
> "org.apache.spark.streaming.kafka010.KafkaRDDPartition" and "--jars
> "spark-yarn_2.12-3.1.2.jar,spark-core_2.12-3.1.2.jar,kafka-clients-2.8.0.jar,spark-streaming-kafka-0-10_2.12-3.1.2.jar,spark-token-provider-kafka-0-10_2.12-3.1.2.jar"
> \" that bothers me quite a lot.
>
> First of all, it's a Spark Streaming (not Structured Streaming) app.
> Correct? Please upgrade at your earliest convenience since it's no longer
> in active development (if supported at all).
>
> Secondly, why are these jars listed explicitly since they're part of
> Spark? You should not really be doing such risky config changes (unless
> you've got no other choice and you know what you're doing).
>
> Pozdrawiam,
> Jacek Laskowski
> ----
> https://about.me/JacekLaskowski
> "The Internals Of" Online Books <https://books.japila.pl/>
> Follow me on https://twitter.com/jaceklaskowski
>
> <https://twitter.com/jaceklaskowski>
>
>
> On Tue, Aug 31, 2021 at 1:00 PM Stelios Philippou <st...@gmail.com>
> wrote:
>
>> Yes you are right.
>> I am using Spring Boot for this.
>>
>> The same does work for the event that does not involve any kafka events.
>> But again i am not sending out extra jars there so nothing is replaced and
>> we are using the default ones.
>>
>> If i do not use the userClassPathFirst which will force the service to
>> use the newer version i will end up with the same problem
>>
>> We are using protobuf v3+ and as such we need to push that version since
>> apache core uses an older version.
>>
>> So all we should really need is the following : --jars
>> "protobuf-java-3.17.3.jar" \
>> and here we need the userClassPathFirst=true in order to use the latest
>> version.
>>
>>
>> Using only this jar as it works on local or no jars defined we ended up
>> with the following error.
>>
>> 21/08/31 10:53:40 WARN  org.apache.spark.scheduler.TaskSetManager: Lost
>> task 0.0 in stage 18.0 (TID 139) (10.60.63.56 executor 1):
>> java.lang.ClassNotFoundException:
>> org.apache.spark.streaming.kafka010.KafkaRDDPartition
>>
>> at java.base/java.net.URLClassLoader.findClass(Unknown Source)
>>
>> at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
>>
>> at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
>>
>> at java.base/java.lang.Class.forName0(Native Method)
>>
>> at java.base/java.lang.Class.forName(Unknown Source)
>>
>> at
>> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)
>>
>>
>>
>>
>> Which can be resolved with passing more jars.
>>
>>
>> Any idea about this error ?
>>
>> K8 does not seem to like this, but Java Spring should be the one that is
>> responsible for the version but it seems K8 does not like this versions.
>>
>> Perhaps miss configuration on K8 ?
>>
>> I haven't set that up so i am not aware of what was done there.
>>
>>
>>
>> For downgrading to java 8 on my K8 might not be so easy. I want to
>> explore if there is something else before doing that as we will need to
>> spin off new instances of K8 to check that.
>>
>>
>>
>> Thank you for the time taken
>>
>>
>>
>>
>> On Tue, 31 Aug 2021 at 12:26, Jacek Laskowski <ja...@japila.pl> wrote:
>>
>>> Hi Stelios,
>>>
>>> I've never seen this error before, but a couple of things caught
>>> my attention that I would look at closer to chase the root cause of the
>>> issue.
>>>
>>> "org.springframework.context.annotation.AnnotationConfigApplicationContext:"
>>> and "21/08/31 07:28:42 ERROR  org.springframework.boot.SpringApplication:
>>> Application run failed" seem to indicate that you're using Spring Boot
>>> (that I know almost nothing about so take the following with a pinch of
>>> salt :))
>>>
>>> Spring Boot manages the classpath by itself and together with another
>>> interesting option in your
>>> spark-submit, spark.driver.userClassPathFirst=true, makes me wonder how
>>> much this exception:
>>>
>>> > org.apache.spark.scheduler.ExternalClusterManager:
>>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
>>> subtype
>>>
>>> could be due to casting compatible types from two different classloaders?
>>>
>>> Just a thought but wanted to share as I think it's worth investigating.
>>>
>>> Pozdrawiam,
>>> Jacek Laskowski
>>> ----
>>> https://about.me/JacekLaskowski
>>> "The Internals Of" Online Books <https://books.japila.pl/>
>>> Follow me on https://twitter.com/jaceklaskowski
>>>
>>> <https://twitter.com/jaceklaskowski>
>>>
>>>
>>> On Tue, Aug 31, 2021 at 9:44 AM Stelios Philippou <st...@gmail.com>
>>> wrote:
>>>
>>>> Hello,
>>>>
>>>> I have been facing the current issue for some time now and I was
>>>> wondering if someone might have some inside on how I can resolve the
>>>> following.
>>>>
>>>> The code (java 11) is working correctly on my local machine but
>>>> whenever I try to launch the following on K8 I am getting the following
>>>> error.
>>>>
>>>> 21/08/31 07:28:42 ERROR  org.apache.spark.SparkContext: Error
>>>> initializing SparkContext.
>>>>
>>>> java.util.ServiceConfigurationError:
>>>> org.apache.spark.scheduler.ExternalClusterManager:
>>>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
>>>> subtype
>>>>
>>>>
>>>>
>>>> I have a spark that will monitor some directories and handle the data
>>>> accordingly.
>>>>
>>>> That part is working correctly on K8 and the SparkContext has no issue
>>>> being initialized there.
>>>>
>>>>
>>>> This is the spark-submit for that
>>>>
>>>>
>>>> spark-submit \
>>>> --master=k8s://https://url:port \
>>>> --deploy-mode cluster \
>>>> --name a-name\
>>>> --conf spark.driver.userClassPathFirst=true  \
>>>> --conf spark.kubernetes.file.upload.path=hdfs://upload-path \
>>>> --files "application-dev.properties,keystore.jks,truststore.jks"  \
>>>> --conf spark.kubernetes.container.image=url/spark:spark-submit \
>>>> --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
>>>> --conf spark.kubernetes.namespace=spark \
>>>> --conf spark.kubernetes.container.image.pullPolicy=Always \
>>>> --conf spark.dynamicAllocation.enabled=false \
>>>> --driver-memory 525m --executor-memory 525m \
>>>> --num-executors 1 --executor-cores 1 \
>>>> target/SparkStream.jar continuous-merge
>>>>
>>>>
>>>> My issue comes when I try to launch the service in order to listen to
>>>> kafka events and store them in HDFS.
>>>>
>>>>
>>>> spark-submit \
>>>> --master=k8s://https://url:port \
>>>> --deploy-mode cluster \
>>>> --name consume-data \
>>>> --conf spark.driver.userClassPathFirst=true  \
>>>> --conf spark.kubernetes.file.upload.path=hdfs://upload-path\
>>>> --files "application-dev.properties,keystore.jks,truststore.jks"  \
>>>> --jars "spark-yarn_2.12-3.1.2.jar,spark-core_2.12-3.1.2.jar,kafka-clients-2.8.0.jar,spark-streaming-kafka-0-10_2.12-3.1.2.jar,spark-token-provider-kafka-0-10_2.12-3.1.2.jar" \
>>>> --conf spark.kubernetes.container.image=url/spark:spark-submit \
>>>> --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
>>>> --conf spark.kubernetes.authenticate.executor.serviceAccountName=spark \
>>>> --conf spark.kubernetes.namespace=spark \
>>>> --conf spark.kubernetes.container.image.pullPolicy=Always \
>>>> --conf spark.dynamicAllocation.enabled=false \
>>>> --driver-memory 1g --executor-memory 1g \
>>>> --num-executors 1 --executor-cores 1 \
>>>> target/SparkStream.jar consume
>>>>
>>>>
>>>> It could be that I am launching the application wrongly or perhaps that
>>>> my K8 is not configured correctly ?
>>>>
>>>>
>>>>
>>>> I have stripped down my code and left it barebone and will end up with
>>>> the following issue :
>>>>
>>>>
>>>> 21/08/31 07:28:42 ERROR  org.apache.spark.SparkContext: Error
>>>> initializing SparkContext.
>>>>
>>>> java.util.ServiceConfigurationError:
>>>> org.apache.spark.scheduler.ExternalClusterManager:
>>>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
>>>> subtype
>>>>
>>>> at java.base/java.util.ServiceLoader.fail(Unknown Source)
>>>>
>>>> at
>>>> java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(Unknown
>>>> Source)
>>>>
>>>> at
>>>> java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(Unknown
>>>> Source)
>>>>
>>>> at java.base/java.util.ServiceLoader$2.hasNext(Unknown Source)
>>>>
>>>> at java.base/java.util.ServiceLoader$3.hasNext(Unknown Source)
>>>>
>>>> at
>>>> scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:
>>>>
>>>>
>>>> 21/08/31 07:28:42 WARN  org.springframework.context.annotation.AnnotationConfigApplicationContext:
>>>> Exception encountered during context initialization - cancelling refresh
>>>> attempt: org.springframework.beans.factory.UnsatisfiedDependencyException:
>>>> Error creating bean with name 'mainApplication': Unsatisfied dependency
>>>> expressed through field 'streamAllKafkaData'; nested exception is
>>>> org.springframework.beans.factory.UnsatisfiedDependencyException: Error
>>>> creating bean with name 'streamAllKafkaData': Unsatisfied dependency
>>>> expressed through field 'javaSparkContext'; nested exception is
>>>> org.springframework.beans.factory.BeanCreationException: Error creating
>>>> bean with name 'javaSparkContext' defined in class path resource
>>>> [com/configuration/SparkConfiguration.class]: Bean instantiation via
>>>> factory method failed; nested exception is
>>>> org.springframework.beans.BeanInstantiationException: Failed to instantiate
>>>> [org.apache.spark.api.java.JavaSparkContext]: Factory method
>>>> 'javaSparkContext' threw exception; nested exception is
>>>> java.util.ServiceConfigurationError:
>>>> org.apache.spark.scheduler.ExternalClusterManager:
>>>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
>>>> subtype
>>>>
>>>> 21/08/31 07:28:42 ERROR  org.springframework.boot.SpringApplication:
>>>> Application run failed
>>>>
>>>> org.springframework.beans.factory.UnsatisfiedDependencyException: Error
>>>> creating bean with name 'mainApplication': Unsatisfied dependency expressed
>>>> through field 'streamAllKafkaData'; nested exception is
>>>> org.springframework.beans.factory.UnsatisfiedDependencyException: Error
>>>> creating bean with name 'streamAllKafkaData': Unsatisfied dependency
>>>> expressed through field 'javaSparkContext'; nested exception is
>>>> org.springframework.beans.factory.BeanCreationException: Error creating
>>>> bean with name 'javaSparkContext' defined in class path resource
>>>> [com/configuration/SparkConfiguration.class]: Bean instantiation via
>>>> factory method failed; nested exception is
>>>> org.springframework.beans.BeanInstantiationException: Failed to instantiate
>>>> [org.apache.spark.api.java.JavaSparkContext]: Factory method
>>>> 'javaSparkContext' threw exception; nested exception is
>>>> java.util.ServiceConfigurationError:
>>>> org.apache.spark.scheduler.ExternalClusterManager:
>>>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
>>>> subtype
>>>>
>>>>
>>>>
>>>>
>>>> It could be that i am launching the application for Kafka wrongly with
>>>> all the extra jars added ?
>>>>
>>>> Just that those seem to be needed or i am getting other errors when not
>>>> including those.
>>>>
>>>>
>>>>
>>>>
>>>> Any help will be greatly appreciated.
>>>>
>>>>
>>>>
>>>> Cheers,
>>>>
>>>> Stelios
>>>>
>>>>
>>>>
>>>>

Re: Spark Stream on Kubernetes Cannot Set up JavaSparkContext

Posted by Jacek Laskowski <ja...@japila.pl>.
Hi,

No idea still, but noticed
"org.apache.spark.streaming.kafka010.KafkaRDDPartition" and "--jars
"spark-yarn_2.12-3.1.2.jar,spark-core_2.12-3.1.2.jar,kafka-clients-2.8.0.jar,spark-streaming-kafka-0-10_2.12-3.1.2.jar,spark-token-provider-kafka-0-10_2.12-3.1.2.jar"
\" that bothers me quite a lot.

First of all, it's a Spark Streaming (not Structured Streaming) app.
Correct? Please upgrade at your earliest convenience since it's no longer
in active development (if supported at all).

Secondly, why are these jars listed explicitly since they're part of Spark?
You should not really be doing such risky config changes (unless you've got
no other choice and you know what you're doing).

Pozdrawiam,
Jacek Laskowski
----
https://about.me/JacekLaskowski
"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>


On Tue, Aug 31, 2021 at 1:00 PM Stelios Philippou <st...@gmail.com>
wrote:

> Yes you are right.
> I am using Spring Boot for this.
>
> The same does work for the event that does not involve any kafka events.
> But again i am not sending out extra jars there so nothing is replaced and
> we are using the default ones.
>
> If i do not use the userClassPathFirst which will force the service to use
> the newer version i will end up with the same problem
>
> We are using protobuf v3+ and as such we need to push that version since
> apache core uses an older version.
>
> So all we should really need is the following : --jars
> "protobuf-java-3.17.3.jar" \
> and here we need the userClassPathFirst=true in order to use the latest
> version.
>
>
> Using only this jar as it works on local or no jars defined we ended up
> with the following error.
>
> 21/08/31 10:53:40 WARN  org.apache.spark.scheduler.TaskSetManager: Lost
> task 0.0 in stage 18.0 (TID 139) (10.60.63.56 executor 1):
> java.lang.ClassNotFoundException:
> org.apache.spark.streaming.kafka010.KafkaRDDPartition
>
> at java.base/java.net.URLClassLoader.findClass(Unknown Source)
>
> at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
>
> at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
>
> at java.base/java.lang.Class.forName0(Native Method)
>
> at java.base/java.lang.Class.forName(Unknown Source)
>
> at
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)
>
>
>
>
> Which can be resolved with passing more jars.
>
>
> Any idea about this error ?
>
> K8 does not seem to like this, but Java Spring should be the one that is
> responsible for the version but it seems K8 does not like this versions.
>
> Perhaps miss configuration on K8 ?
>
> I haven't set that up so i am not aware of what was done there.
>
>
>
> For downgrading to java 8 on my K8 might not be so easy. I want to explore
> if there is something else before doing that as we will need to spin off
> new instances of K8 to check that.
>
>
>
> Thank you for the time taken
>
>
>
>
> On Tue, 31 Aug 2021 at 12:26, Jacek Laskowski <ja...@japila.pl> wrote:
>
>> Hi Stelios,
>>
>> I've never seen this error before, but a couple of things caught
>> my attention that I would look at closer to chase the root cause of the
>> issue.
>>
>> "org.springframework.context.annotation.AnnotationConfigApplicationContext:"
>> and "21/08/31 07:28:42 ERROR  org.springframework.boot.SpringApplication:
>> Application run failed" seem to indicate that you're using Spring Boot
>> (that I know almost nothing about so take the following with a pinch of
>> salt :))
>>
>> Spring Boot manages the classpath by itself and together with another
>> interesting option in your
>> spark-submit, spark.driver.userClassPathFirst=true, makes me wonder how
>> much this exception:
>>
>> > org.apache.spark.scheduler.ExternalClusterManager:
>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
>> subtype
>>
>> could be due to casting compatible types from two different classloaders?
>>
>> Just a thought but wanted to share as I think it's worth investigating.
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> ----
>> https://about.me/JacekLaskowski
>> "The Internals Of" Online Books <https://books.japila.pl/>
>> Follow me on https://twitter.com/jaceklaskowski
>>
>> <https://twitter.com/jaceklaskowski>
>>
>>
>> On Tue, Aug 31, 2021 at 9:44 AM Stelios Philippou <st...@gmail.com>
>> wrote:
>>
>>> Hello,
>>>
>>> I have been facing the current issue for some time now and I was
>>> wondering if someone might have some inside on how I can resolve the
>>> following.
>>>
>>> The code (java 11) is working correctly on my local machine but whenever
>>> I try to launch the following on K8 I am getting the following error.
>>>
>>> 21/08/31 07:28:42 ERROR  org.apache.spark.SparkContext: Error
>>> initializing SparkContext.
>>>
>>> java.util.ServiceConfigurationError:
>>> org.apache.spark.scheduler.ExternalClusterManager:
>>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
>>> subtype
>>>
>>>
>>>
>>> I have a spark that will monitor some directories and handle the data
>>> accordingly.
>>>
>>> That part is working correctly on K8 and the SparkContext has no issue
>>> being initialized there.
>>>
>>>
>>> This is the spark-submit for that
>>>
>>>
>>> spark-submit \
>>> --master=k8s://https://url:port \
>>> --deploy-mode cluster \
>>> --name a-name\
>>> --conf spark.driver.userClassPathFirst=true  \
>>> --conf spark.kubernetes.file.upload.path=hdfs://upload-path \
>>> --files "application-dev.properties,keystore.jks,truststore.jks"  \
>>> --conf spark.kubernetes.container.image=url/spark:spark-submit \
>>> --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
>>> --conf spark.kubernetes.namespace=spark \
>>> --conf spark.kubernetes.container.image.pullPolicy=Always \
>>> --conf spark.dynamicAllocation.enabled=false \
>>> --driver-memory 525m --executor-memory 525m \
>>> --num-executors 1 --executor-cores 1 \
>>> target/SparkStream.jar continuous-merge
>>>
>>>
>>> My issue comes when I try to launch the service in order to listen to
>>> kafka events and store them in HDFS.
>>>
>>>
>>> spark-submit \
>>> --master=k8s://https://url:port \
>>> --deploy-mode cluster \
>>> --name consume-data \
>>> --conf spark.driver.userClassPathFirst=true  \
>>> --conf spark.kubernetes.file.upload.path=hdfs://upload-path\
>>> --files "application-dev.properties,keystore.jks,truststore.jks"  \
>>> --jars "spark-yarn_2.12-3.1.2.jar,spark-core_2.12-3.1.2.jar,kafka-clients-2.8.0.jar,spark-streaming-kafka-0-10_2.12-3.1.2.jar,spark-token-provider-kafka-0-10_2.12-3.1.2.jar" \
>>> --conf spark.kubernetes.container.image=url/spark:spark-submit \
>>> --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
>>> --conf spark.kubernetes.authenticate.executor.serviceAccountName=spark \
>>> --conf spark.kubernetes.namespace=spark \
>>> --conf spark.kubernetes.container.image.pullPolicy=Always \
>>> --conf spark.dynamicAllocation.enabled=false \
>>> --driver-memory 1g --executor-memory 1g \
>>> --num-executors 1 --executor-cores 1 \
>>> target/SparkStream.jar consume
>>>
>>>
>>> It could be that I am launching the application wrongly or perhaps that
>>> my K8 is not configured correctly ?
>>>
>>>
>>>
>>> I have stripped down my code and left it barebone and will end up with
>>> the following issue :
>>>
>>>
>>> 21/08/31 07:28:42 ERROR  org.apache.spark.SparkContext: Error
>>> initializing SparkContext.
>>>
>>> java.util.ServiceConfigurationError:
>>> org.apache.spark.scheduler.ExternalClusterManager:
>>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
>>> subtype
>>>
>>> at java.base/java.util.ServiceLoader.fail(Unknown Source)
>>>
>>> at
>>> java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(Unknown
>>> Source)
>>>
>>> at
>>> java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(Unknown
>>> Source)
>>>
>>> at java.base/java.util.ServiceLoader$2.hasNext(Unknown Source)
>>>
>>> at java.base/java.util.ServiceLoader$3.hasNext(Unknown Source)
>>>
>>> at
>>> scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:
>>>
>>>
>>> 21/08/31 07:28:42 WARN  org.springframework.context.annotation.AnnotationConfigApplicationContext:
>>> Exception encountered during context initialization - cancelling refresh
>>> attempt: org.springframework.beans.factory.UnsatisfiedDependencyException:
>>> Error creating bean with name 'mainApplication': Unsatisfied dependency
>>> expressed through field 'streamAllKafkaData'; nested exception is
>>> org.springframework.beans.factory.UnsatisfiedDependencyException: Error
>>> creating bean with name 'streamAllKafkaData': Unsatisfied dependency
>>> expressed through field 'javaSparkContext'; nested exception is
>>> org.springframework.beans.factory.BeanCreationException: Error creating
>>> bean with name 'javaSparkContext' defined in class path resource
>>> [com/configuration/SparkConfiguration.class]: Bean instantiation via
>>> factory method failed; nested exception is
>>> org.springframework.beans.BeanInstantiationException: Failed to instantiate
>>> [org.apache.spark.api.java.JavaSparkContext]: Factory method
>>> 'javaSparkContext' threw exception; nested exception is
>>> java.util.ServiceConfigurationError:
>>> org.apache.spark.scheduler.ExternalClusterManager:
>>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
>>> subtype
>>>
>>> 21/08/31 07:28:42 ERROR  org.springframework.boot.SpringApplication:
>>> Application run failed
>>>
>>> org.springframework.beans.factory.UnsatisfiedDependencyException: Error
>>> creating bean with name 'mainApplication': Unsatisfied dependency expressed
>>> through field 'streamAllKafkaData'; nested exception is
>>> org.springframework.beans.factory.UnsatisfiedDependencyException: Error
>>> creating bean with name 'streamAllKafkaData': Unsatisfied dependency
>>> expressed through field 'javaSparkContext'; nested exception is
>>> org.springframework.beans.factory.BeanCreationException: Error creating
>>> bean with name 'javaSparkContext' defined in class path resource
>>> [com/configuration/SparkConfiguration.class]: Bean instantiation via
>>> factory method failed; nested exception is
>>> org.springframework.beans.BeanInstantiationException: Failed to instantiate
>>> [org.apache.spark.api.java.JavaSparkContext]: Factory method
>>> 'javaSparkContext' threw exception; nested exception is
>>> java.util.ServiceConfigurationError:
>>> org.apache.spark.scheduler.ExternalClusterManager:
>>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
>>> subtype
>>>
>>>
>>>
>>>
>>> It could be that i am launching the application for Kafka wrongly with
>>> all the extra jars added ?
>>>
>>> Just that those seem to be needed or i am getting other errors when not
>>> including those.
>>>
>>>
>>>
>>>
>>> Any help will be greatly appreciated.
>>>
>>>
>>>
>>> Cheers,
>>>
>>> Stelios
>>>
>>>
>>>
>>>

Re: Spark Stream on Kubernetes Cannot Set up JavaSparkContext

Posted by Stelios Philippou <st...@gmail.com>.
Yes you are right.
I am using Spring Boot for this.

The same does work for the event that does not involve any kafka events.
But again i am not sending out extra jars there so nothing is replaced and
we are using the default ones.

If i do not use the userClassPathFirst which will force the service to use
the newer version i will end up with the same problem

We are using protobuf v3+ and as such we need to push that version since
apache core uses an older version.

So all we should really need is the following : --jars
"protobuf-java-3.17.3.jar" \
and here we need the userClassPathFirst=true in order to use the latest
version.


Using only this jar as it works on local or no jars defined we ended up
with the following error.

21/08/31 10:53:40 WARN  org.apache.spark.scheduler.TaskSetManager: Lost
task 0.0 in stage 18.0 (TID 139) (10.60.63.56 executor 1):
java.lang.ClassNotFoundException:
org.apache.spark.streaming.kafka010.KafkaRDDPartition

at java.base/java.net.URLClassLoader.findClass(Unknown Source)

at java.base/java.lang.ClassLoader.loadClass(Unknown Source)

at java.base/java.lang.ClassLoader.loadClass(Unknown Source)

at java.base/java.lang.Class.forName0(Native Method)

at java.base/java.lang.Class.forName(Unknown Source)

at
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)




Which can be resolved with passing more jars.


Any idea about this error ?

K8 does not seem to like this, but Java Spring should be the one that is
responsible for the version but it seems K8 does not like this versions.

Perhaps miss configuration on K8 ?

I haven't set that up so i am not aware of what was done there.



For downgrading to java 8 on my K8 might not be so easy. I want to explore
if there is something else before doing that as we will need to spin off
new instances of K8 to check that.



Thank you for the time taken




On Tue, 31 Aug 2021 at 12:26, Jacek Laskowski <ja...@japila.pl> wrote:

> Hi Stelios,
>
> I've never seen this error before, but a couple of things caught
> my attention that I would look at closer to chase the root cause of the
> issue.
>
> "org.springframework.context.annotation.AnnotationConfigApplicationContext:"
> and "21/08/31 07:28:42 ERROR  org.springframework.boot.SpringApplication:
> Application run failed" seem to indicate that you're using Spring Boot
> (that I know almost nothing about so take the following with a pinch of
> salt :))
>
> Spring Boot manages the classpath by itself and together with another
> interesting option in your
> spark-submit, spark.driver.userClassPathFirst=true, makes me wonder how
> much this exception:
>
> > org.apache.spark.scheduler.ExternalClusterManager:
> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
> subtype
>
> could be due to casting compatible types from two different classloaders?
>
> Just a thought but wanted to share as I think it's worth investigating.
>
> Pozdrawiam,
> Jacek Laskowski
> ----
> https://about.me/JacekLaskowski
> "The Internals Of" Online Books <https://books.japila.pl/>
> Follow me on https://twitter.com/jaceklaskowski
>
> <https://twitter.com/jaceklaskowski>
>
>
> On Tue, Aug 31, 2021 at 9:44 AM Stelios Philippou <st...@gmail.com>
> wrote:
>
>> Hello,
>>
>> I have been facing the current issue for some time now and I was
>> wondering if someone might have some inside on how I can resolve the
>> following.
>>
>> The code (java 11) is working correctly on my local machine but whenever
>> I try to launch the following on K8 I am getting the following error.
>>
>> 21/08/31 07:28:42 ERROR  org.apache.spark.SparkContext: Error
>> initializing SparkContext.
>>
>> java.util.ServiceConfigurationError:
>> org.apache.spark.scheduler.ExternalClusterManager:
>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
>> subtype
>>
>>
>>
>> I have a spark that will monitor some directories and handle the data
>> accordingly.
>>
>> That part is working correctly on K8 and the SparkContext has no issue
>> being initialized there.
>>
>>
>> This is the spark-submit for that
>>
>>
>> spark-submit \
>> --master=k8s://https://url:port \
>> --deploy-mode cluster \
>> --name a-name\
>> --conf spark.driver.userClassPathFirst=true  \
>> --conf spark.kubernetes.file.upload.path=hdfs://upload-path \
>> --files "application-dev.properties,keystore.jks,truststore.jks"  \
>> --conf spark.kubernetes.container.image=url/spark:spark-submit \
>> --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
>> --conf spark.kubernetes.namespace=spark \
>> --conf spark.kubernetes.container.image.pullPolicy=Always \
>> --conf spark.dynamicAllocation.enabled=false \
>> --driver-memory 525m --executor-memory 525m \
>> --num-executors 1 --executor-cores 1 \
>> target/SparkStream.jar continuous-merge
>>
>>
>> My issue comes when I try to launch the service in order to listen to
>> kafka events and store them in HDFS.
>>
>>
>> spark-submit \
>> --master=k8s://https://url:port \
>> --deploy-mode cluster \
>> --name consume-data \
>> --conf spark.driver.userClassPathFirst=true  \
>> --conf spark.kubernetes.file.upload.path=hdfs://upload-path\
>> --files "application-dev.properties,keystore.jks,truststore.jks"  \
>> --jars "spark-yarn_2.12-3.1.2.jar,spark-core_2.12-3.1.2.jar,kafka-clients-2.8.0.jar,spark-streaming-kafka-0-10_2.12-3.1.2.jar,spark-token-provider-kafka-0-10_2.12-3.1.2.jar" \
>> --conf spark.kubernetes.container.image=url/spark:spark-submit \
>> --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
>> --conf spark.kubernetes.authenticate.executor.serviceAccountName=spark \
>> --conf spark.kubernetes.namespace=spark \
>> --conf spark.kubernetes.container.image.pullPolicy=Always \
>> --conf spark.dynamicAllocation.enabled=false \
>> --driver-memory 1g --executor-memory 1g \
>> --num-executors 1 --executor-cores 1 \
>> target/SparkStream.jar consume
>>
>>
>> It could be that I am launching the application wrongly or perhaps that
>> my K8 is not configured correctly ?
>>
>>
>>
>> I have stripped down my code and left it barebone and will end up with
>> the following issue :
>>
>>
>> 21/08/31 07:28:42 ERROR  org.apache.spark.SparkContext: Error
>> initializing SparkContext.
>>
>> java.util.ServiceConfigurationError:
>> org.apache.spark.scheduler.ExternalClusterManager:
>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
>> subtype
>>
>> at java.base/java.util.ServiceLoader.fail(Unknown Source)
>>
>> at
>> java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(Unknown
>> Source)
>>
>> at
>> java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(Unknown
>> Source)
>>
>> at java.base/java.util.ServiceLoader$2.hasNext(Unknown Source)
>>
>> at java.base/java.util.ServiceLoader$3.hasNext(Unknown Source)
>>
>> at
>> scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:
>>
>>
>> 21/08/31 07:28:42 WARN  org.springframework.context.annotation.AnnotationConfigApplicationContext:
>> Exception encountered during context initialization - cancelling refresh
>> attempt: org.springframework.beans.factory.UnsatisfiedDependencyException:
>> Error creating bean with name 'mainApplication': Unsatisfied dependency
>> expressed through field 'streamAllKafkaData'; nested exception is
>> org.springframework.beans.factory.UnsatisfiedDependencyException: Error
>> creating bean with name 'streamAllKafkaData': Unsatisfied dependency
>> expressed through field 'javaSparkContext'; nested exception is
>> org.springframework.beans.factory.BeanCreationException: Error creating
>> bean with name 'javaSparkContext' defined in class path resource
>> [com/configuration/SparkConfiguration.class]: Bean instantiation via
>> factory method failed; nested exception is
>> org.springframework.beans.BeanInstantiationException: Failed to instantiate
>> [org.apache.spark.api.java.JavaSparkContext]: Factory method
>> 'javaSparkContext' threw exception; nested exception is
>> java.util.ServiceConfigurationError:
>> org.apache.spark.scheduler.ExternalClusterManager:
>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
>> subtype
>>
>> 21/08/31 07:28:42 ERROR  org.springframework.boot.SpringApplication:
>> Application run failed
>>
>> org.springframework.beans.factory.UnsatisfiedDependencyException: Error
>> creating bean with name 'mainApplication': Unsatisfied dependency expressed
>> through field 'streamAllKafkaData'; nested exception is
>> org.springframework.beans.factory.UnsatisfiedDependencyException: Error
>> creating bean with name 'streamAllKafkaData': Unsatisfied dependency
>> expressed through field 'javaSparkContext'; nested exception is
>> org.springframework.beans.factory.BeanCreationException: Error creating
>> bean with name 'javaSparkContext' defined in class path resource
>> [com/configuration/SparkConfiguration.class]: Bean instantiation via
>> factory method failed; nested exception is
>> org.springframework.beans.BeanInstantiationException: Failed to instantiate
>> [org.apache.spark.api.java.JavaSparkContext]: Factory method
>> 'javaSparkContext' threw exception; nested exception is
>> java.util.ServiceConfigurationError:
>> org.apache.spark.scheduler.ExternalClusterManager:
>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
>> subtype
>>
>>
>>
>>
>> It could be that i am launching the application for Kafka wrongly with
>> all the extra jars added ?
>>
>> Just that those seem to be needed or i am getting other errors when not
>> including those.
>>
>>
>>
>>
>> Any help will be greatly appreciated.
>>
>>
>>
>> Cheers,
>>
>> Stelios
>>
>>
>>
>>

Re: Spark Stream on Kubernetes Cannot Set up JavaSparkContext

Posted by Jacek Laskowski <ja...@japila.pl>.
Hi Stelios,

I've never seen this error before, but a couple of things caught
my attention that I would look at closer to chase the root cause of the
issue.

"org.springframework.context.annotation.AnnotationConfigApplicationContext:"
and "21/08/31 07:28:42 ERROR  org.springframework.boot.SpringApplication:
Application run failed" seem to indicate that you're using Spring Boot
(that I know almost nothing about so take the following with a pinch of
salt :))

Spring Boot manages the classpath by itself and together with another
interesting option in your
spark-submit, spark.driver.userClassPathFirst=true, makes me wonder how
much this exception:

> org.apache.spark.scheduler.ExternalClusterManager:
org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
subtype

could be due to casting compatible types from two different classloaders?

Just a thought but wanted to share as I think it's worth investigating.

Pozdrawiam,
Jacek Laskowski
----
https://about.me/JacekLaskowski
"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>


On Tue, Aug 31, 2021 at 9:44 AM Stelios Philippou <st...@gmail.com>
wrote:

> Hello,
>
> I have been facing the current issue for some time now and I was wondering
> if someone might have some inside on how I can resolve the following.
>
> The code (java 11) is working correctly on my local machine but whenever I
> try to launch the following on K8 I am getting the following error.
>
> 21/08/31 07:28:42 ERROR  org.apache.spark.SparkContext: Error
> initializing SparkContext.
>
> java.util.ServiceConfigurationError:
> org.apache.spark.scheduler.ExternalClusterManager:
> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
> subtype
>
>
>
> I have a spark that will monitor some directories and handle the data
> accordingly.
>
> That part is working correctly on K8 and the SparkContext has no issue
> being initialized there.
>
>
> This is the spark-submit for that
>
>
> spark-submit \
> --master=k8s://https://url:port \
> --deploy-mode cluster \
> --name a-name\
> --conf spark.driver.userClassPathFirst=true  \
> --conf spark.kubernetes.file.upload.path=hdfs://upload-path \
> --files "application-dev.properties,keystore.jks,truststore.jks"  \
> --conf spark.kubernetes.container.image=url/spark:spark-submit \
> --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
> --conf spark.kubernetes.namespace=spark \
> --conf spark.kubernetes.container.image.pullPolicy=Always \
> --conf spark.dynamicAllocation.enabled=false \
> --driver-memory 525m --executor-memory 525m \
> --num-executors 1 --executor-cores 1 \
> target/SparkStream.jar continuous-merge
>
>
> My issue comes when I try to launch the service in order to listen to
> kafka events and store them in HDFS.
>
>
> spark-submit \
> --master=k8s://https://url:port \
> --deploy-mode cluster \
> --name consume-data \
> --conf spark.driver.userClassPathFirst=true  \
> --conf spark.kubernetes.file.upload.path=hdfs://upload-path\
> --files "application-dev.properties,keystore.jks,truststore.jks"  \
> --jars "spark-yarn_2.12-3.1.2.jar,spark-core_2.12-3.1.2.jar,kafka-clients-2.8.0.jar,spark-streaming-kafka-0-10_2.12-3.1.2.jar,spark-token-provider-kafka-0-10_2.12-3.1.2.jar" \
> --conf spark.kubernetes.container.image=url/spark:spark-submit \
> --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
> --conf spark.kubernetes.authenticate.executor.serviceAccountName=spark \
> --conf spark.kubernetes.namespace=spark \
> --conf spark.kubernetes.container.image.pullPolicy=Always \
> --conf spark.dynamicAllocation.enabled=false \
> --driver-memory 1g --executor-memory 1g \
> --num-executors 1 --executor-cores 1 \
> target/SparkStream.jar consume
>
>
> It could be that I am launching the application wrongly or perhaps that my
> K8 is not configured correctly ?
>
>
>
> I have stripped down my code and left it barebone and will end up with the
> following issue :
>
>
> 21/08/31 07:28:42 ERROR  org.apache.spark.SparkContext: Error
> initializing SparkContext.
>
> java.util.ServiceConfigurationError:
> org.apache.spark.scheduler.ExternalClusterManager:
> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
> subtype
>
> at java.base/java.util.ServiceLoader.fail(Unknown Source)
>
> at
> java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(Unknown
> Source)
>
> at
> java.base/java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(Unknown
> Source)
>
> at java.base/java.util.ServiceLoader$2.hasNext(Unknown Source)
>
> at java.base/java.util.ServiceLoader$3.hasNext(Unknown Source)
>
> at
> scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:
>
>
> 21/08/31 07:28:42 WARN  org.springframework.context.annotation.AnnotationConfigApplicationContext:
> Exception encountered during context initialization - cancelling refresh
> attempt: org.springframework.beans.factory.UnsatisfiedDependencyException:
> Error creating bean with name 'mainApplication': Unsatisfied dependency
> expressed through field 'streamAllKafkaData'; nested exception is
> org.springframework.beans.factory.UnsatisfiedDependencyException: Error
> creating bean with name 'streamAllKafkaData': Unsatisfied dependency
> expressed through field 'javaSparkContext'; nested exception is
> org.springframework.beans.factory.BeanCreationException: Error creating
> bean with name 'javaSparkContext' defined in class path resource
> [com/configuration/SparkConfiguration.class]: Bean instantiation via
> factory method failed; nested exception is
> org.springframework.beans.BeanInstantiationException: Failed to instantiate
> [org.apache.spark.api.java.JavaSparkContext]: Factory method
> 'javaSparkContext' threw exception; nested exception is
> java.util.ServiceConfigurationError:
> org.apache.spark.scheduler.ExternalClusterManager:
> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
> subtype
>
> 21/08/31 07:28:42 ERROR  org.springframework.boot.SpringApplication:
> Application run failed
>
> org.springframework.beans.factory.UnsatisfiedDependencyException: Error
> creating bean with name 'mainApplication': Unsatisfied dependency expressed
> through field 'streamAllKafkaData'; nested exception is
> org.springframework.beans.factory.UnsatisfiedDependencyException: Error
> creating bean with name 'streamAllKafkaData': Unsatisfied dependency
> expressed through field 'javaSparkContext'; nested exception is
> org.springframework.beans.factory.BeanCreationException: Error creating
> bean with name 'javaSparkContext' defined in class path resource
> [com/configuration/SparkConfiguration.class]: Bean instantiation via
> factory method failed; nested exception is
> org.springframework.beans.BeanInstantiationException: Failed to instantiate
> [org.apache.spark.api.java.JavaSparkContext]: Factory method
> 'javaSparkContext' threw exception; nested exception is
> java.util.ServiceConfigurationError:
> org.apache.spark.scheduler.ExternalClusterManager:
> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager not a
> subtype
>
>
>
>
> It could be that i am launching the application for Kafka wrongly with all
> the extra jars added ?
>
> Just that those seem to be needed or i am getting other errors when not
> including those.
>
>
>
>
> Any help will be greatly appreciated.
>
>
>
> Cheers,
>
> Stelios
>
>
>
>