You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@zeppelin.apache.org by Fabrizio Fab <fa...@tiscali.it> on 2021/10/25 19:58:31 UTC

Spark on k8s cluster mode, from outside of the cluster.

Dear All, I am struggling since more than a week on the following problem.
My Zeppelin Server is running outside the k8s cluster (there is a reason for this) and I am able to run Spark zeppelin notes in Client mode but not in Cluster mode.

I see that, at first, a pod for the interpreter (RemoteInterpreterServer) is created on the cluster by spark-submit from the Zeppelin host, with deployMode=cluster (and this happens without errors), then the interpreter itself runs another spark-submit  (this time from the Pod) with deployMode=client.

Exactly, the following is the command line submitted by the interpreter from its pod

/opt/spark/bin/spark-submit \
--conf spark.driver.bindAddress=<ip address of the interpreter pod> \
--deploy-mode client \
--properties-file /opt/spark/conf/spark.properties \
--class org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer \
spark-internal \
<ZEPPELIN_HOST> \
<ZEPPELIN_SERVER_RPC_PORT> \
<interpreter_name>-<user name>

At this point, the interpreter Pod remains in "Running" state, while the Zeppelin note remains in "Pending" forever.

The log of the Interpreter (level = DEBUG) at the end only says:
 INFO [2021-10-25 18:16:58,229] ({RemoteInterpreterServer-Thread} RemoteInterpreterServer.java[run]:194) Launching ThriftServer at <ip address of the interpreter pod>:<random port>
 INFO [2021-10-25 18:16:58,229] ({RegisterThread} RemoteInterpreterServer.java[run]:592) Start registration
 INFO [2021-10-25 18:16:58,332] ({RegisterThread} RemoteInterpreterServer.java[run]:606) Registering interpreter process
 INFO [2021-10-25 18:16:58,356] ({RegisterThread} RemoteInterpreterServer.java[run]:608) Registered interpreter process
 INFO [2021-10-25 18:16:58,356] ({RegisterThread} RemoteInterpreterServer.java[run]:629) Registration finished
(I replaced the true ip and port with a placeholder to make the log more clear for you)

I am stuck at this point....
Anyone can help me ? Thank you in advance. Fabrizio


Re: Spark on k8s cluster mode, from outside of the cluster.

Posted by Fabrizio Fab <fa...@tiscali.it>.
Thank you Philipp, for your answer.

interpreter.sh is the shell script which is run by the Zeppelin Server and, in particular, the following Line that you highlighted starts the interpreter in CLUSTER MODE in my case:

INTERPRETER_RUN_COMMAND+=("${SPARK_SUBMIT}" "--class" "${ZEPPELIN_SERVER}" "--driver-class-path" "${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}" "--driver-java-options" "${JAVA_INTP_OPTS}" "${SPARK_SUBMIT_OPTIONS_ARRAY[@]}" "${ZEPPELIN_SPARK_CONF_ARRAY[@]}" "${SPARK_APP_JAR}" "${CALLBACK_HOST}" "${PORT}" "${INTP_GROUP_ID}" "${INTP_PORT}")

at this point, I can see that the interpreter is started on the cluster, in a pod, as expected.
But then, THE INTERPRETER ITSELF, run a SECOND spark-submit. 
To be more clear : this time IS NOT the Zeppelin Server which runs the spark-submit, and it is not the "interpreter.sh" script which is called; 
this time is the Interpreter pod which runs the following:

/opt/spark/bin/spark-submit \
 --conf spark.driver.bindAddress=<ip address of the interpreter pod> \
 --deploy-mode client \
 --properties-file /opt/spark/conf/spark.properties \
 --class org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer \
 spark-internal \
  <ZEPPELIN_HOST> \
 <ZEPPELIN_SERVER_RPC_PORT> \
  <interpreter_name>-<user name>

As you can see, the format of this second spark submit is quite different from the first:

1) deploy-mode = CLIENT  (not cluster)
2) the resource name is "spark-internal", not a jar file.

It seems like the 1st intance of the spark interpreter (run by the zeppelin server) should work as a bridge between the zeppelin server and the 2nd instance of the Spark interpreter (run by this 1st instance)  which should perform is ordinary duty of getting paragraphs from zeppelin, running them,  sending the output back to zeppelin.

In this case what is not working as expected is the communication between the 3 processes:

1) zeppelin server
2) "bridge" interpreter 
3) "true" interpreter

Is it possibile to have some more low-level technical info on the interconnection flows between the 2 interpreter instances and the zeppelin server ?

Many thanks again.
Fabrizio


On 2021/10/26 11:40:24, Philipp Dallig <ph...@gmail.com> wrote: 
> Hi Fabrizio,
> 
> At the moment I think zeppelin does not support running spark jobs in 
> cluster mode. But in fact K8s mode simulates cluster mode. Because the 
> Zeppelin interpreter is already started as a pod in K8s, as a manual 
> Spark submit execution would do in cluster mode.
> 
> Spark-submit is called only once during the start of the Zeppelin 
> interpreter. You will find the call in these lines: 
> https://github.com/apache/zeppelin/blob/2f55fe8ed277b28d71f858633f9c9d76fd18f0c3/bin/interpreter.sh#L303-L305
> 
> Best Regards
> Philipp
> 
> 
> Am 25.10.21 um 21:58 schrieb Fabrizio Fab:
> > Dear All, I am struggling since more than a week on the following problem.
> > My Zeppelin Server is running outside the k8s cluster (there is a reason for this) and I am able to run Spark zeppelin notes in Client mode but not in Cluster mode.
> >
> > I see that, at first, a pod for the interpreter (RemoteInterpreterServer) is created on the cluster by spark-submit from the Zeppelin host, with deployMode=cluster (and this happens without errors), then the interpreter itself runs another spark-submit  (this time from the Pod) with deployMode=client.
> >
> > Exactly, the following is the command line submitted by the interpreter from its pod
> >
> > /opt/spark/bin/spark-submit \
> > --conf spark.driver.bindAddress=<ip address of the interpreter pod> \
> > --deploy-mode client \
> > --properties-file /opt/spark/conf/spark.properties \
> > --class org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer \
> > spark-internal \
> > <ZEPPELIN_HOST> \
> > <ZEPPELIN_SERVER_RPC_PORT> \
> > <interpreter_name>-<user name>
> >
> > At this point, the interpreter Pod remains in "Running" state, while the Zeppelin note remains in "Pending" forever.
> >
> > The log of the Interpreter (level = DEBUG) at the end only says:
> >   INFO [2021-10-25 18:16:58,229] ({RemoteInterpreterServer-Thread} RemoteInterpreterServer.java[run]:194) Launching ThriftServer at <ip address of the interpreter pod>:<random port>
> >   INFO [2021-10-25 18:16:58,229] ({RegisterThread} RemoteInterpreterServer.java[run]:592) Start registration
> >   INFO [2021-10-25 18:16:58,332] ({RegisterThread} RemoteInterpreterServer.java[run]:606) Registering interpreter process
> >   INFO [2021-10-25 18:16:58,356] ({RegisterThread} RemoteInterpreterServer.java[run]:608) Registered interpreter process
> >   INFO [2021-10-25 18:16:58,356] ({RegisterThread} RemoteInterpreterServer.java[run]:629) Registration finished
> > (I replaced the true ip and port with a placeholder to make the log more clear for you)
> >
> > I am stuck at this point....
> > Anyone can help me ? Thank you in advance. Fabrizio
> >
> 

Re: Spark on k8s cluster mode, from outside of the cluster. [SOLVED]

Posted by Mich Talebzadeh <mi...@gmail.com>.
I read some of the notes here. I have been away from Zeppelin for a while
but have extensive experience with Spark on Kubernetes (k8s).

First of all I assume that the zeppelin server is just the client that you
are running to submit a job to Spark on k8s.

If the Spark on k8s is offered as a service (say Google GKE) etc, then that
GKE has a defined name (has been created) that you can address by using
something like

KUBERNETES_MASTER_IP=$(gcloud container clusters list --filter
name=<NAME_OF_GEKE_CLUSTER> --format='value(MASTER_IP)')

so this way you get the master ip address. in your spark-submit job on your
client (where you have Spark binary installed (same version as the docker
image is used inside k8s (say spark version 3.1.1)), then you can initiate
(this is with Python example)

       spark-submit --verbose \
           --properties-file ${property_file} \
           --master k8s://https://$KUBERNETES_MASTER_IP:443 \
           --deploy-mode cluster \
           --name sparkBQ \
           --conf
spark.yarn.appMasterEnv.PYSPARK_PYTHON=./pyspark_venv/bin/python \
           --py-files $CODE_DIRECTORY/DSBQ.zip \
           --conf spark.kubernetes.namespace=$NAMESPACE \
           --conf spark.network.timeout=300 \
           --conf spark.executor.instances=$NEXEC \
           --conf spark.kubernetes.driver.limit.cores=1 \

Note that $NEXEC is the number of executors requested. Current spark  model
works on the basis of the "one-container-per-Pod" model
<https://kubernetes.io/docs/concepts/workloads/pods/> meaning that for
each node of the cluster you will have one node running the driver and each
remaining node running one executor each.  So if you have a 5 node k8s
cluster, $NEXEC = 4.

In this model, increasing the number of executors above the available nodes
for executors, will result in the addition of pending executors that will
not be deployed with Pending status as shown below
.
kubectl get pod -n spark

NAME                                         READY   STATUS    RESTARTS
 AGE

randomdatabigquery-b40dd67c791417bf-exec-1   1/1     Running   0
65s

randomdatabigquery-b40dd67c791417bf-exec-2   1/1     Running   0
65s

randomdatabigquery-b40dd67c791417bf-exec-3   1/1     Running   0
65s

randomdatabigquery-b40dd67c791417bf-exec-4   1/1     Running   0
65s

randomdatabigquery-b40dd67c791417bf-exec-5   0/1     Pending   0
65s

sparkbq-13d8857c7913e1d0-driver              1/1     Running   0
81s

Spark GUI can be accessed through the following port forwarding once the
driver was created.(run time)


DRIVER_POD_NAME=`kubectl get pods -n spark |grep driver|awk '{print $1}'`

kubectl port-forward $DRIVER_POD_NAME 4040:4040 -n $NAMESPACE &

Also


 kubectl describe pod $DRIVER_POD_NAME -n $NAMESPACE

 kubectl logs $DRIVER_POD_NAME -n $NAMESPACE



I would be surprised if this does not work with zeppelin clients. Also you
can build your own docker image for k8s or the one offered by the vendor.
We in Spark community will be offering ready build dockers shortly from the
official spark community site for different versions.


HTH


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 28 Oct 2021 at 09:29, Fabrizio Fab <fa...@tiscali.it>
wrote:

>
> Yeah ! Thank you very much Philipp: tonight I explored carefully the
> source code and discovered the 2 thrift servers stuff.
>
> Therefore I solved my problem: here the solution adopted, which can be
> useful for other people.
>
> CONTEXT
> I have my Zeppelin Server installation located into a LAN, where a K8s
> Cluster is available, and I want to submit notes in cluster mode over the
> k8s Cluster.
>
> SOLUTION
> - the driver pod must have its address exposed on the LAN network,
> otherwise the Zeppelin server cannot connect to the Interpreter Thrift
> server: I suppose that there are several ways of doing this, but I am not a
> k8s expert so I simply created a basic driver-pod.template.yaml with a
> "hostNetwork" spec and referenced it by the
> "spark.kubernetes.driver.podTemplateFile" interpreter setting.
>
> At this point, the 2 servers can talk each other.
>
> NOTE
> 1) do not set the zeppelin run mode = k8s. It must be "local" (or the
> default "auto")
> 2) a NFS share (or other shared persistent volume) is required in order to
> upload the required JARS and easily access the driver logs when the driver
> shuts down:
>
> spark.kubernetes.driver.volumes.nfs.<whichever name>.options.server=<your
> server>
> spark.kubernetes.driver.volumes.nfs.<whichever name>.options.path=<local
> path>
> spark.kubernetes.driver.volumes.nfs.<whichever name>.mount.path=<mount
> path>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> On 2021/10/28 06:48:54, Philipp Dallig <ph...@gmail.com> wrote:
> > Hi Fabrizio,
> >
> > We have two connections. First, the Zeppelin interpreter opens a
> > connection to the Zeppelin server to register and to send back the
> > interpreter output. The Zeppelin server is the CALLBACK_HOST and the
> > PORT indicates where the Zeppelin server opened the Thrift service for
> > the Zeppelin interpreter.
> >
> > An important part of the registration is that the Zeppelin interpreter
> > tells the Zeppelin server where the interpreter pod has an open Thrifts
> > server port. This information can be found in the Zeppelin server log
> > output. Be on the lookout for this message.
> >
> https://github.com/apache/zeppelin/blob/master/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java#L483
> > Also note the function ZEPPELIN_K8S_PORTFORWARD, which should help your
> > Zeppelin server to reach the Zeppelin interpreter in K8s.
> >
> >  > the 1st "spark-submit" in "cluster mode" is started from the client
> > (in the zeppelin host, in our case), then the 2nd "spark-submit" in
> > "client mode" is started by the "/opt/entrypoint.sh" script inside the
> > standard spark docker image.
> >
> > Are you sure you are using the K8s launcher? As you can see in this part
> > of the code
> > (
> https://github.com/apache/zeppelin/blob/2f55fe8ed277b28d71f858633f9c9d76fd18f0c3/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java#L411),
>
> > Zeppelin always uses client mode.
> >
> > The architecture is quite simple:
> >
> > Zeppelin-Server -> Zeppelin-Interpreter (with Spark in client mode) on
> > K8s -> x-Spark-executors (based on your config)
> >
> > Best Regards
> > Philipp
> >
> >
> > Am 27.10.21 um 15:19 schrieb Fabrizio Fab:
> >
> > > Hi Philipp, okay, I realized just now of my HUGE misunderstanding !
> > >
> > > The "double-spark-submit" patter is just the standard spark-on-k8s way
> of running spark applications in cluster mode:
> > > the 1st "spark-submit" in "cluster mode" is started from the client
> (in the zeppelin host, in our case), then the 2nd "spark-submit" in "client
> mode" is started by the "/opt/entrypoint.sh" script inside the standard
> spark docker image.
> > >
> > > At this point I can make a more precise question:
> > >
> > > I see that the interpreter.sh starts the RemoteInterpreterServer with,
> in particular the following paramters: CALLBACK_HOST / PORT
> > > They refers to the Zeppelin host and RPC port
> > >
> > > Moreover, when the interpreter starts, it runs a Thrift server on some
> random port.
> > >
> > > So, I ask: which communications are supposed to happen, in order to
> correctly set-up my firewall/routing rules ?
> > >
> > > -1 Must the Zeppelin server connect to the Interpreter Thrift server ?
> > > -2 Must the Interpreter Thrift server connect to the Zeppelin server?
> > > -3 Both ?
> > >
> > > - Which ports must the Zeppelin server/ The thrift server  find open
> on the other server ?
> > >
> > > Thank you everybody!
> > >
> > > Fabrizio
> > >
> > >
> > >
> > >
> > > On 2021/10/26 11:40:24, Philipp Dallig <ph...@gmail.com>
> wrote:
> > >> Hi Fabrizio,
> > >>
> > >> At the moment I think zeppelin does not support running spark jobs in
> > >> cluster mode. But in fact K8s mode simulates cluster mode. Because the
> > >> Zeppelin interpreter is already started as a pod in K8s, as a manual
> > >> Spark submit execution would do in cluster mode.
> > >>
> > >> Spark-submit is called only once during the start of the Zeppelin
> > >> interpreter. You will find the call in these lines:
> > >>
> https://github.com/apache/zeppelin/blob/2f55fe8ed277b28d71f858633f9c9d76fd18f0c3/bin/interpreter.sh#L303-L305
> > >>
> > >> Best Regards
> > >> Philipp
> > >>
> > >>
> > >> Am 25.10.21 um 21:58 schrieb Fabrizio Fab:
> > >>> Dear All, I am struggling since more than a week on the following
> problem.
> > >>> My Zeppelin Server is running outside the k8s cluster (there is a
> reason for this) and I am able to run Spark zeppelin notes in Client mode
> but not in Cluster mode.
> > >>>
> > >>> I see that, at first, a pod for the interpreter
> (RemoteInterpreterServer) is created on the cluster by spark-submit from
> the Zeppelin host, with deployMode=cluster (and this happens without
> errors), then the interpreter itself runs another spark-submit  (this time
> from the Pod) with deployMode=client.
> > >>>
> > >>> Exactly, the following is the command line submitted by the
> interpreter from its pod
> > >>>
> > >>> /opt/spark/bin/spark-submit \
> > >>> --conf spark.driver.bindAddress=<ip address of the interpreter pod> \
> > >>> --deploy-mode client \
> > >>> --properties-file /opt/spark/conf/spark.properties \
> > >>> --class
> org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer \
> > >>> spark-internal \
> > >>> <ZEPPELIN_HOST> \
> > >>> <ZEPPELIN_SERVER_RPC_PORT> \
> > >>> <interpreter_name>-<user name>
> > >>>
> > >>> At this point, the interpreter Pod remains in "Running" state, while
> the Zeppelin note remains in "Pending" forever.
> > >>>
> > >>> The log of the Interpreter (level = DEBUG) at the end only says:
> > >>>    INFO [2021-10-25 18:16:58,229] ({RemoteInterpreterServer-Thread}
> RemoteInterpreterServer.java[run]:194) Launching ThriftServer at <ip
> address of the interpreter pod>:<random port>
> > >>>    INFO [2021-10-25 18:16:58,229] ({RegisterThread}
> RemoteInterpreterServer.java[run]:592) Start registration
> > >>>    INFO [2021-10-25 18:16:58,332] ({RegisterThread}
> RemoteInterpreterServer.java[run]:606) Registering interpreter process
> > >>>    INFO [2021-10-25 18:16:58,356] ({RegisterThread}
> RemoteInterpreterServer.java[run]:608) Registered interpreter process
> > >>>    INFO [2021-10-25 18:16:58,356] ({RegisterThread}
> RemoteInterpreterServer.java[run]:629) Registration finished
> > >>> (I replaced the true ip and port with a placeholder to make the log
> more clear for you)
> > >>>
> > >>> I am stuck at this point....
> > >>> Anyone can help me ? Thank you in advance. Fabrizio
> > >>>
> >
>

Re: Spark on k8s cluster mode, from outside of the cluster. [SOLVED]

Posted by Jeff Zhang <zj...@gmail.com>.
Thanks for the sharing, it would be nice if you can write a blog to share
it with more wide zeppelin users.


Fabrizio Fab <fa...@tiscali.it> 于2021年10月28日周四 下午4:29写道:

>
> Yeah ! Thank you very much Philipp: tonight I explored carefully the
> source code and discovered the 2 thrift servers stuff.
>
> Therefore I solved my problem: here the solution adopted, which can be
> useful for other people.
>
> CONTEXT
> I have my Zeppelin Server installation located into a LAN, where a K8s
> Cluster is available, and I want to submit notes in cluster mode over the
> k8s Cluster.
>
> SOLUTION
> - the driver pod must have its address exposed on the LAN network,
> otherwise the Zeppelin server cannot connect to the Interpreter Thrift
> server: I suppose that there are several ways of doing this, but I am not a
> k8s expert so I simply created a basic driver-pod.template.yaml with a
> "hostNetwork" spec and referenced it by the
> "spark.kubernetes.driver.podTemplateFile" interpreter setting.
>
> At this point, the 2 servers can talk each other.
>
> NOTE
> 1) do not set the zeppelin run mode = k8s. It must be "local" (or the
> default "auto")
> 2) a NFS share (or other shared persistent volume) is required in order to
> upload the required JARS and easily access the driver logs when the driver
> shuts down:
>
> spark.kubernetes.driver.volumes.nfs.<whichever name>.options.server=<your
> server>
> spark.kubernetes.driver.volumes.nfs.<whichever name>.options.path=<local
> path>
> spark.kubernetes.driver.volumes.nfs.<whichever name>.mount.path=<mount
> path>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> On 2021/10/28 06:48:54, Philipp Dallig <ph...@gmail.com> wrote:
> > Hi Fabrizio,
> >
> > We have two connections. First, the Zeppelin interpreter opens a
> > connection to the Zeppelin server to register and to send back the
> > interpreter output. The Zeppelin server is the CALLBACK_HOST and the
> > PORT indicates where the Zeppelin server opened the Thrift service for
> > the Zeppelin interpreter.
> >
> > An important part of the registration is that the Zeppelin interpreter
> > tells the Zeppelin server where the interpreter pod has an open Thrifts
> > server port. This information can be found in the Zeppelin server log
> > output. Be on the lookout for this message.
> >
> https://github.com/apache/zeppelin/blob/master/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java#L483
> > Also note the function ZEPPELIN_K8S_PORTFORWARD, which should help your
> > Zeppelin server to reach the Zeppelin interpreter in K8s.
> >
> >  > the 1st "spark-submit" in "cluster mode" is started from the client
> > (in the zeppelin host, in our case), then the 2nd "spark-submit" in
> > "client mode" is started by the "/opt/entrypoint.sh" script inside the
> > standard spark docker image.
> >
> > Are you sure you are using the K8s launcher? As you can see in this part
> > of the code
> > (
> https://github.com/apache/zeppelin/blob/2f55fe8ed277b28d71f858633f9c9d76fd18f0c3/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java#L411),
>
> > Zeppelin always uses client mode.
> >
> > The architecture is quite simple:
> >
> > Zeppelin-Server -> Zeppelin-Interpreter (with Spark in client mode) on
> > K8s -> x-Spark-executors (based on your config)
> >
> > Best Regards
> > Philipp
> >
> >
> > Am 27.10.21 um 15:19 schrieb Fabrizio Fab:
> >
> > > Hi Philipp, okay, I realized just now of my HUGE misunderstanding !
> > >
> > > The "double-spark-submit" patter is just the standard spark-on-k8s way
> of running spark applications in cluster mode:
> > > the 1st "spark-submit" in "cluster mode" is started from the client
> (in the zeppelin host, in our case), then the 2nd "spark-submit" in "client
> mode" is started by the "/opt/entrypoint.sh" script inside the standard
> spark docker image.
> > >
> > > At this point I can make a more precise question:
> > >
> > > I see that the interpreter.sh starts the RemoteInterpreterServer with,
> in particular the following paramters: CALLBACK_HOST / PORT
> > > They refers to the Zeppelin host and RPC port
> > >
> > > Moreover, when the interpreter starts, it runs a Thrift server on some
> random port.
> > >
> > > So, I ask: which communications are supposed to happen, in order to
> correctly set-up my firewall/routing rules ?
> > >
> > > -1 Must the Zeppelin server connect to the Interpreter Thrift server ?
> > > -2 Must the Interpreter Thrift server connect to the Zeppelin server?
> > > -3 Both ?
> > >
> > > - Which ports must the Zeppelin server/ The thrift server  find open
> on the other server ?
> > >
> > > Thank you everybody!
> > >
> > > Fabrizio
> > >
> > >
> > >
> > >
> > > On 2021/10/26 11:40:24, Philipp Dallig <ph...@gmail.com>
> wrote:
> > >> Hi Fabrizio,
> > >>
> > >> At the moment I think zeppelin does not support running spark jobs in
> > >> cluster mode. But in fact K8s mode simulates cluster mode. Because the
> > >> Zeppelin interpreter is already started as a pod in K8s, as a manual
> > >> Spark submit execution would do in cluster mode.
> > >>
> > >> Spark-submit is called only once during the start of the Zeppelin
> > >> interpreter. You will find the call in these lines:
> > >>
> https://github.com/apache/zeppelin/blob/2f55fe8ed277b28d71f858633f9c9d76fd18f0c3/bin/interpreter.sh#L303-L305
> > >>
> > >> Best Regards
> > >> Philipp
> > >>
> > >>
> > >> Am 25.10.21 um 21:58 schrieb Fabrizio Fab:
> > >>> Dear All, I am struggling since more than a week on the following
> problem.
> > >>> My Zeppelin Server is running outside the k8s cluster (there is a
> reason for this) and I am able to run Spark zeppelin notes in Client mode
> but not in Cluster mode.
> > >>>
> > >>> I see that, at first, a pod for the interpreter
> (RemoteInterpreterServer) is created on the cluster by spark-submit from
> the Zeppelin host, with deployMode=cluster (and this happens without
> errors), then the interpreter itself runs another spark-submit  (this time
> from the Pod) with deployMode=client.
> > >>>
> > >>> Exactly, the following is the command line submitted by the
> interpreter from its pod
> > >>>
> > >>> /opt/spark/bin/spark-submit \
> > >>> --conf spark.driver.bindAddress=<ip address of the interpreter pod> \
> > >>> --deploy-mode client \
> > >>> --properties-file /opt/spark/conf/spark.properties \
> > >>> --class
> org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer \
> > >>> spark-internal \
> > >>> <ZEPPELIN_HOST> \
> > >>> <ZEPPELIN_SERVER_RPC_PORT> \
> > >>> <interpreter_name>-<user name>
> > >>>
> > >>> At this point, the interpreter Pod remains in "Running" state, while
> the Zeppelin note remains in "Pending" forever.
> > >>>
> > >>> The log of the Interpreter (level = DEBUG) at the end only says:
> > >>>    INFO [2021-10-25 18:16:58,229] ({RemoteInterpreterServer-Thread}
> RemoteInterpreterServer.java[run]:194) Launching ThriftServer at <ip
> address of the interpreter pod>:<random port>
> > >>>    INFO [2021-10-25 18:16:58,229] ({RegisterThread}
> RemoteInterpreterServer.java[run]:592) Start registration
> > >>>    INFO [2021-10-25 18:16:58,332] ({RegisterThread}
> RemoteInterpreterServer.java[run]:606) Registering interpreter process
> > >>>    INFO [2021-10-25 18:16:58,356] ({RegisterThread}
> RemoteInterpreterServer.java[run]:608) Registered interpreter process
> > >>>    INFO [2021-10-25 18:16:58,356] ({RegisterThread}
> RemoteInterpreterServer.java[run]:629) Registration finished
> > >>> (I replaced the true ip and port with a placeholder to make the log
> more clear for you)
> > >>>
> > >>> I am stuck at this point....
> > >>> Anyone can help me ? Thank you in advance. Fabrizio
> > >>>
> >
>


-- 
Best Regards

Jeff Zhang

Re: Spark on k8s cluster mode, from outside of the cluster. [SOLVED]

Posted by Fabrizio Fab <fa...@tiscali.it>.
Yeah ! Thank you very much Philipp: tonight I explored carefully the source code and discovered the 2 thrift servers stuff.

Therefore I solved my problem: here the solution adopted, which can be useful for other people.

CONTEXT
I have my Zeppelin Server installation located into a LAN, where a K8s Cluster is available, and I want to submit notes in cluster mode over the k8s Cluster.

SOLUTION
- the driver pod must have its address exposed on the LAN network, otherwise the Zeppelin server cannot connect to the Interpreter Thrift server: I suppose that there are several ways of doing this, but I am not a k8s expert so I simply created a basic driver-pod.template.yaml with a "hostNetwork" spec and referenced it by the  "spark.kubernetes.driver.podTemplateFile" interpreter setting.
 
At this point, the 2 servers can talk each other.

NOTE
1) do not set the zeppelin run mode = k8s. It must be "local" (or the default "auto")
2) a NFS share (or other shared persistent volume) is required in order to upload the required JARS and easily access the driver logs when the driver shuts down:

spark.kubernetes.driver.volumes.nfs.<whichever name>.options.server=<your server>
spark.kubernetes.driver.volumes.nfs.<whichever name>.options.path=<local path>
spark.kubernetes.driver.volumes.nfs.<whichever name>.mount.path=<mount path>















On 2021/10/28 06:48:54, Philipp Dallig <ph...@gmail.com> wrote: 
> Hi Fabrizio,
> 
> We have two connections. First, the Zeppelin interpreter opens a 
> connection to the Zeppelin server to register and to send back the 
> interpreter output. The Zeppelin server is the CALLBACK_HOST and the 
> PORT indicates where the Zeppelin server opened the Thrift service for 
> the Zeppelin interpreter.
> 
> An important part of the registration is that the Zeppelin interpreter 
> tells the Zeppelin server where the interpreter pod has an open Thrifts 
> server port. This information can be found in the Zeppelin server log 
> output. Be on the lookout for this message. 
> https://github.com/apache/zeppelin/blob/master/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java#L483
> Also note the function ZEPPELIN_K8S_PORTFORWARD, which should help your 
> Zeppelin server to reach the Zeppelin interpreter in K8s.
> 
>  > the 1st "spark-submit" in "cluster mode" is started from the client 
> (in the zeppelin host, in our case), then the 2nd "spark-submit" in 
> "client mode" is started by the "/opt/entrypoint.sh" script inside the 
> standard spark docker image.
> 
> Are you sure you are using the K8s launcher? As you can see in this part 
> of the code 
> (https://github.com/apache/zeppelin/blob/2f55fe8ed277b28d71f858633f9c9d76fd18f0c3/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java#L411), 
> Zeppelin always uses client mode.
> 
> The architecture is quite simple:
> 
> Zeppelin-Server -> Zeppelin-Interpreter (with Spark in client mode) on 
> K8s -> x-Spark-executors (based on your config)
> 
> Best Regards
> Philipp
> 
> 
> Am 27.10.21 um 15:19 schrieb Fabrizio Fab:
> 
> > Hi Philipp, okay, I realized just now of my HUGE misunderstanding !
> >
> > The "double-spark-submit" patter is just the standard spark-on-k8s way of running spark applications in cluster mode:
> > the 1st "spark-submit" in "cluster mode" is started from the client (in the zeppelin host, in our case), then the 2nd "spark-submit" in "client mode" is started by the "/opt/entrypoint.sh" script inside the standard spark docker image.
> >
> > At this point I can make a more precise question:
> >
> > I see that the interpreter.sh starts the RemoteInterpreterServer with, in particular the following paramters: CALLBACK_HOST / PORT
> > They refers to the Zeppelin host and RPC port
> >
> > Moreover, when the interpreter starts, it runs a Thrift server on some random port.
> >
> > So, I ask: which communications are supposed to happen, in order to correctly set-up my firewall/routing rules ?
> >
> > -1 Must the Zeppelin server connect to the Interpreter Thrift server ?
> > -2 Must the Interpreter Thrift server connect to the Zeppelin server?
> > -3 Both ?
> >
> > - Which ports must the Zeppelin server/ The thrift server  find open on the other server ?
> >
> > Thank you everybody!
> >
> > Fabrizio
> >
> >
> >
> >
> > On 2021/10/26 11:40:24, Philipp Dallig <ph...@gmail.com> wrote:
> >> Hi Fabrizio,
> >>
> >> At the moment I think zeppelin does not support running spark jobs in
> >> cluster mode. But in fact K8s mode simulates cluster mode. Because the
> >> Zeppelin interpreter is already started as a pod in K8s, as a manual
> >> Spark submit execution would do in cluster mode.
> >>
> >> Spark-submit is called only once during the start of the Zeppelin
> >> interpreter. You will find the call in these lines:
> >> https://github.com/apache/zeppelin/blob/2f55fe8ed277b28d71f858633f9c9d76fd18f0c3/bin/interpreter.sh#L303-L305
> >>
> >> Best Regards
> >> Philipp
> >>
> >>
> >> Am 25.10.21 um 21:58 schrieb Fabrizio Fab:
> >>> Dear All, I am struggling since more than a week on the following problem.
> >>> My Zeppelin Server is running outside the k8s cluster (there is a reason for this) and I am able to run Spark zeppelin notes in Client mode but not in Cluster mode.
> >>>
> >>> I see that, at first, a pod for the interpreter (RemoteInterpreterServer) is created on the cluster by spark-submit from the Zeppelin host, with deployMode=cluster (and this happens without errors), then the interpreter itself runs another spark-submit  (this time from the Pod) with deployMode=client.
> >>>
> >>> Exactly, the following is the command line submitted by the interpreter from its pod
> >>>
> >>> /opt/spark/bin/spark-submit \
> >>> --conf spark.driver.bindAddress=<ip address of the interpreter pod> \
> >>> --deploy-mode client \
> >>> --properties-file /opt/spark/conf/spark.properties \
> >>> --class org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer \
> >>> spark-internal \
> >>> <ZEPPELIN_HOST> \
> >>> <ZEPPELIN_SERVER_RPC_PORT> \
> >>> <interpreter_name>-<user name>
> >>>
> >>> At this point, the interpreter Pod remains in "Running" state, while the Zeppelin note remains in "Pending" forever.
> >>>
> >>> The log of the Interpreter (level = DEBUG) at the end only says:
> >>>    INFO [2021-10-25 18:16:58,229] ({RemoteInterpreterServer-Thread} RemoteInterpreterServer.java[run]:194) Launching ThriftServer at <ip address of the interpreter pod>:<random port>
> >>>    INFO [2021-10-25 18:16:58,229] ({RegisterThread} RemoteInterpreterServer.java[run]:592) Start registration
> >>>    INFO [2021-10-25 18:16:58,332] ({RegisterThread} RemoteInterpreterServer.java[run]:606) Registering interpreter process
> >>>    INFO [2021-10-25 18:16:58,356] ({RegisterThread} RemoteInterpreterServer.java[run]:608) Registered interpreter process
> >>>    INFO [2021-10-25 18:16:58,356] ({RegisterThread} RemoteInterpreterServer.java[run]:629) Registration finished
> >>> (I replaced the true ip and port with a placeholder to make the log more clear for you)
> >>>
> >>> I am stuck at this point....
> >>> Anyone can help me ? Thank you in advance. Fabrizio
> >>>
> 

Re: Spark on k8s cluster mode, from outside of the cluster.

Posted by Philipp Dallig <ph...@gmail.com>.
Hi Fabrizio,

We have two connections. First, the Zeppelin interpreter opens a 
connection to the Zeppelin server to register and to send back the 
interpreter output. The Zeppelin server is the CALLBACK_HOST and the 
PORT indicates where the Zeppelin server opened the Thrift service for 
the Zeppelin interpreter.

An important part of the registration is that the Zeppelin interpreter 
tells the Zeppelin server where the interpreter pod has an open Thrifts 
server port. This information can be found in the Zeppelin server log 
output. Be on the lookout for this message. 
https://github.com/apache/zeppelin/blob/master/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java#L483
Also note the function ZEPPELIN_K8S_PORTFORWARD, which should help your 
Zeppelin server to reach the Zeppelin interpreter in K8s.

 > the 1st "spark-submit" in "cluster mode" is started from the client 
(in the zeppelin host, in our case), then the 2nd "spark-submit" in 
"client mode" is started by the "/opt/entrypoint.sh" script inside the 
standard spark docker image.

Are you sure you are using the K8s launcher? As you can see in this part 
of the code 
(https://github.com/apache/zeppelin/blob/2f55fe8ed277b28d71f858633f9c9d76fd18f0c3/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java#L411), 
Zeppelin always uses client mode.

The architecture is quite simple:

Zeppelin-Server -> Zeppelin-Interpreter (with Spark in client mode) on 
K8s -> x-Spark-executors (based on your config)

Best Regards
Philipp


Am 27.10.21 um 15:19 schrieb Fabrizio Fab:

> Hi Philipp, okay, I realized just now of my HUGE misunderstanding !
>
> The "double-spark-submit" patter is just the standard spark-on-k8s way of running spark applications in cluster mode:
> the 1st "spark-submit" in "cluster mode" is started from the client (in the zeppelin host, in our case), then the 2nd "spark-submit" in "client mode" is started by the "/opt/entrypoint.sh" script inside the standard spark docker image.
>
> At this point I can make a more precise question:
>
> I see that the interpreter.sh starts the RemoteInterpreterServer with, in particular the following paramters: CALLBACK_HOST / PORT
> They refers to the Zeppelin host and RPC port
>
> Moreover, when the interpreter starts, it runs a Thrift server on some random port.
>
> So, I ask: which communications are supposed to happen, in order to correctly set-up my firewall/routing rules ?
>
> -1 Must the Zeppelin server connect to the Interpreter Thrift server ?
> -2 Must the Interpreter Thrift server connect to the Zeppelin server?
> -3 Both ?
>
> - Which ports must the Zeppelin server/ The thrift server  find open on the other server ?
>
> Thank you everybody!
>
> Fabrizio
>
>
>
>
> On 2021/10/26 11:40:24, Philipp Dallig <ph...@gmail.com> wrote:
>> Hi Fabrizio,
>>
>> At the moment I think zeppelin does not support running spark jobs in
>> cluster mode. But in fact K8s mode simulates cluster mode. Because the
>> Zeppelin interpreter is already started as a pod in K8s, as a manual
>> Spark submit execution would do in cluster mode.
>>
>> Spark-submit is called only once during the start of the Zeppelin
>> interpreter. You will find the call in these lines:
>> https://github.com/apache/zeppelin/blob/2f55fe8ed277b28d71f858633f9c9d76fd18f0c3/bin/interpreter.sh#L303-L305
>>
>> Best Regards
>> Philipp
>>
>>
>> Am 25.10.21 um 21:58 schrieb Fabrizio Fab:
>>> Dear All, I am struggling since more than a week on the following problem.
>>> My Zeppelin Server is running outside the k8s cluster (there is a reason for this) and I am able to run Spark zeppelin notes in Client mode but not in Cluster mode.
>>>
>>> I see that, at first, a pod for the interpreter (RemoteInterpreterServer) is created on the cluster by spark-submit from the Zeppelin host, with deployMode=cluster (and this happens without errors), then the interpreter itself runs another spark-submit  (this time from the Pod) with deployMode=client.
>>>
>>> Exactly, the following is the command line submitted by the interpreter from its pod
>>>
>>> /opt/spark/bin/spark-submit \
>>> --conf spark.driver.bindAddress=<ip address of the interpreter pod> \
>>> --deploy-mode client \
>>> --properties-file /opt/spark/conf/spark.properties \
>>> --class org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer \
>>> spark-internal \
>>> <ZEPPELIN_HOST> \
>>> <ZEPPELIN_SERVER_RPC_PORT> \
>>> <interpreter_name>-<user name>
>>>
>>> At this point, the interpreter Pod remains in "Running" state, while the Zeppelin note remains in "Pending" forever.
>>>
>>> The log of the Interpreter (level = DEBUG) at the end only says:
>>>    INFO [2021-10-25 18:16:58,229] ({RemoteInterpreterServer-Thread} RemoteInterpreterServer.java[run]:194) Launching ThriftServer at <ip address of the interpreter pod>:<random port>
>>>    INFO [2021-10-25 18:16:58,229] ({RegisterThread} RemoteInterpreterServer.java[run]:592) Start registration
>>>    INFO [2021-10-25 18:16:58,332] ({RegisterThread} RemoteInterpreterServer.java[run]:606) Registering interpreter process
>>>    INFO [2021-10-25 18:16:58,356] ({RegisterThread} RemoteInterpreterServer.java[run]:608) Registered interpreter process
>>>    INFO [2021-10-25 18:16:58,356] ({RegisterThread} RemoteInterpreterServer.java[run]:629) Registration finished
>>> (I replaced the true ip and port with a placeholder to make the log more clear for you)
>>>
>>> I am stuck at this point....
>>> Anyone can help me ? Thank you in advance. Fabrizio
>>>

Re: Spark on k8s cluster mode, from outside of the cluster.

Posted by Fabrizio Fab <fa...@tiscali.it>.
Hi Philipp, okay, I realized just now of my HUGE misunderstanding !

The "double-spark-submit" patter is just the standard spark-on-k8s way of running spark applications in cluster mode:
the 1st "spark-submit" in "cluster mode" is started from the client (in the zeppelin host, in our case), then the 2nd "spark-submit" in "client mode" is started by the "/opt/entrypoint.sh" script inside the standard spark docker image.

At this point I can make a more precise question:

I see that the interpreter.sh starts the RemoteInterpreterServer with, in particular the following paramters: CALLBACK_HOST / PORT
They refers to the Zeppelin host and RPC port

Moreover, when the interpreter starts, it runs a Thrift server on some random port.

So, I ask: which communications are supposed to happen, in order to correctly set-up my firewall/routing rules ?

-1 Must the Zeppelin server connect to the Interpreter Thrift server ?
-2 Must the Interpreter Thrift server connect to the Zeppelin server?
-3 Both ?

- Which ports must the Zeppelin server/ The thrift server  find open on the other server ?

Thank you everybody!

Fabrizio




On 2021/10/26 11:40:24, Philipp Dallig <ph...@gmail.com> wrote: 
> Hi Fabrizio,
> 
> At the moment I think zeppelin does not support running spark jobs in 
> cluster mode. But in fact K8s mode simulates cluster mode. Because the 
> Zeppelin interpreter is already started as a pod in K8s, as a manual 
> Spark submit execution would do in cluster mode.
> 
> Spark-submit is called only once during the start of the Zeppelin 
> interpreter. You will find the call in these lines: 
> https://github.com/apache/zeppelin/blob/2f55fe8ed277b28d71f858633f9c9d76fd18f0c3/bin/interpreter.sh#L303-L305
> 
> Best Regards
> Philipp
> 
> 
> Am 25.10.21 um 21:58 schrieb Fabrizio Fab:
> > Dear All, I am struggling since more than a week on the following problem.
> > My Zeppelin Server is running outside the k8s cluster (there is a reason for this) and I am able to run Spark zeppelin notes in Client mode but not in Cluster mode.
> >
> > I see that, at first, a pod for the interpreter (RemoteInterpreterServer) is created on the cluster by spark-submit from the Zeppelin host, with deployMode=cluster (and this happens without errors), then the interpreter itself runs another spark-submit  (this time from the Pod) with deployMode=client.
> >
> > Exactly, the following is the command line submitted by the interpreter from its pod
> >
> > /opt/spark/bin/spark-submit \
> > --conf spark.driver.bindAddress=<ip address of the interpreter pod> \
> > --deploy-mode client \
> > --properties-file /opt/spark/conf/spark.properties \
> > --class org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer \
> > spark-internal \
> > <ZEPPELIN_HOST> \
> > <ZEPPELIN_SERVER_RPC_PORT> \
> > <interpreter_name>-<user name>
> >
> > At this point, the interpreter Pod remains in "Running" state, while the Zeppelin note remains in "Pending" forever.
> >
> > The log of the Interpreter (level = DEBUG) at the end only says:
> >   INFO [2021-10-25 18:16:58,229] ({RemoteInterpreterServer-Thread} RemoteInterpreterServer.java[run]:194) Launching ThriftServer at <ip address of the interpreter pod>:<random port>
> >   INFO [2021-10-25 18:16:58,229] ({RegisterThread} RemoteInterpreterServer.java[run]:592) Start registration
> >   INFO [2021-10-25 18:16:58,332] ({RegisterThread} RemoteInterpreterServer.java[run]:606) Registering interpreter process
> >   INFO [2021-10-25 18:16:58,356] ({RegisterThread} RemoteInterpreterServer.java[run]:608) Registered interpreter process
> >   INFO [2021-10-25 18:16:58,356] ({RegisterThread} RemoteInterpreterServer.java[run]:629) Registration finished
> > (I replaced the true ip and port with a placeholder to make the log more clear for you)
> >
> > I am stuck at this point....
> > Anyone can help me ? Thank you in advance. Fabrizio
> >
> 

Re: Spark on k8s cluster mode, from outside of the cluster.

Posted by Philipp Dallig <ph...@gmail.com>.
Hi Fabrizio,

At the moment I think zeppelin does not support running spark jobs in 
cluster mode. But in fact K8s mode simulates cluster mode. Because the 
Zeppelin interpreter is already started as a pod in K8s, as a manual 
Spark submit execution would do in cluster mode.

Spark-submit is called only once during the start of the Zeppelin 
interpreter. You will find the call in these lines: 
https://github.com/apache/zeppelin/blob/2f55fe8ed277b28d71f858633f9c9d76fd18f0c3/bin/interpreter.sh#L303-L305

Best Regards
Philipp


Am 25.10.21 um 21:58 schrieb Fabrizio Fab:
> Dear All, I am struggling since more than a week on the following problem.
> My Zeppelin Server is running outside the k8s cluster (there is a reason for this) and I am able to run Spark zeppelin notes in Client mode but not in Cluster mode.
>
> I see that, at first, a pod for the interpreter (RemoteInterpreterServer) is created on the cluster by spark-submit from the Zeppelin host, with deployMode=cluster (and this happens without errors), then the interpreter itself runs another spark-submit  (this time from the Pod) with deployMode=client.
>
> Exactly, the following is the command line submitted by the interpreter from its pod
>
> /opt/spark/bin/spark-submit \
> --conf spark.driver.bindAddress=<ip address of the interpreter pod> \
> --deploy-mode client \
> --properties-file /opt/spark/conf/spark.properties \
> --class org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer \
> spark-internal \
> <ZEPPELIN_HOST> \
> <ZEPPELIN_SERVER_RPC_PORT> \
> <interpreter_name>-<user name>
>
> At this point, the interpreter Pod remains in "Running" state, while the Zeppelin note remains in "Pending" forever.
>
> The log of the Interpreter (level = DEBUG) at the end only says:
>   INFO [2021-10-25 18:16:58,229] ({RemoteInterpreterServer-Thread} RemoteInterpreterServer.java[run]:194) Launching ThriftServer at <ip address of the interpreter pod>:<random port>
>   INFO [2021-10-25 18:16:58,229] ({RegisterThread} RemoteInterpreterServer.java[run]:592) Start registration
>   INFO [2021-10-25 18:16:58,332] ({RegisterThread} RemoteInterpreterServer.java[run]:606) Registering interpreter process
>   INFO [2021-10-25 18:16:58,356] ({RegisterThread} RemoteInterpreterServer.java[run]:608) Registered interpreter process
>   INFO [2021-10-25 18:16:58,356] ({RegisterThread} RemoteInterpreterServer.java[run]:629) Registration finished
> (I replaced the true ip and port with a placeholder to make the log more clear for you)
>
> I am stuck at this point....
> Anyone can help me ? Thank you in advance. Fabrizio
>