You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by lannyripple <la...@gmail.com> on 2014/06/11 02:48:58 UTC

spark streaming, kafka, SPARK_CLASSPATH

I am using Spark 1.0.0 compiled with Hadoop 1.2.1.

I have a toy spark-streaming-kafka program.  It reads from a kafka queue and
does

    stream
      .map {case (k, v) => (v, 1)}
      .reduceByKey(_ + _)
      .print()

using a 1 second interval on the stream.

The docs say to make Spark and Hadoop jars 'provided' but this breaks for
spark-streaming.  Including spark-streaming (and spark-streaming-kafka) as
'compile' to sweep them into our assembly gives collisions on javax.*
classes.  To work around this I modified
$SPARK_HOME/bin/compute-classpath.sh to include spark-streaming,
spark-streaming-kafka, and zkclient.  (Note that kafka is included as
'compile' in my project and picked up in the assembly.)

I have set up conf/spark-env.sh as needed.  I have copied my assembly to
/tmp/myjar.jar on all spark hosts and to my hdfs /tmp/jars directory.  I am
running spark-submit from my spark master.  I am guided by the information
here https://spark.apache.org/docs/latest/submitting-applications.html

Well at this point I was going to detail all the ways spark-submit fails to
follow it's own documentation.  If I do not invoke sparkContext.setJars()
then it just fails to find the driver class.  This is using various
combinations of absolute path, file:, hdfs: (Warning: Skip remote jar)??,
and local: prefixes on the application-jar and --jars arguments.

If I invoke sparkContext.setJars() and include my assembly jar I get
further.  At this point I get a failure from
kafka.consumer.ConsumerConnector not being found.  I suspect this is because
spark-streaming-kafka needs the Kafka dependency it but my assembly jar is
too late in the classpath.

At this point I try setting spark.files.userClassPathfirst to 'true' but
this causes more things to blow up.

I finally found something that works.  Namely setting environment variable
SPARK_CLASSPATH=/tmp/myjar.jar  But silly me, this is deprecated and I'm
helpfully informed to

  Please instead use:
   - ./spark-submit with --driver-class-path to augment the driver classpath
   - spark.executor.extraClassPath to augment the executor classpath

which when put into a file and introduced with --properties-file does not
work.  (Also tried spark.files.userClassPathFirst here.)  These fail with
the kafka.consumer.ConsumerConnector error.

At a guess what's going on is that using SPARK_CLASSPATH I have my assembly
jar in the classpath at SparkSubmit invocation 

  Spark Command: java -cp
/tmp/myjar.jar::/opt/spark/conf:/opt/spark/lib/spark-assembly-1.0.0-hadoop1.2.1.jar:/opt/spark/lib/spark-streaming_2.10-1.0.0.jar:/opt/spark/lib/spark-streaming-kafka_2.10-1.0.0.jar:/opt/spark/lib/zkclient-0.4.jar
-XX:MaxPermSize=128m -Djava.library.path= -Xms512m -Xmx512m
org.apache.spark.deploy.SparkSubmit --class me.KafkaStreamingWC
/tmp/myjar.jar

but using --properties-file then the assembly is not available for
SparkSubmit.

I think the root cause is either spark-submit not handling the
spark-streaming libraries so they can be 'provided' or the inclusion of
org.elicpse.jetty.orbit in the streaming libraries which cause

  [error] (*:assembly) deduplicate: different file contents found in the
following:
  [error]
/Users/lanny/.ivy2/cache/org.eclipse.jetty.orbit/javax.transaction/orbits/javax.transaction-1.1.1.v201105210645.jar:META-INF/ECLIPSEF.RSA
  [error]
/Users/lanny/.ivy2/cache/org.eclipse.jetty.orbit/javax.servlet/orbits/javax.servlet-3.0.0.v201112011016.jar:META-INF/ECLIPSEF.RSA
  [error]
/Users/lanny/.ivy2/cache/org.eclipse.jetty.orbit/javax.mail.glassfish/orbits/javax.mail.glassfish-1.4.1.v201005082020.jar:META-INF/ECLIPSEF.RSA
  [error]
/Users/lanny/.ivy2/cache/org.eclipse.jetty.orbit/javax.activation/orbits/javax.activation-1.1.0.v201105071233.jar:META-INF/ECLIPSEF.RSA

I've tried applying mergeStategy in assembly for my assembly.sbt but then I
get 

  Invalid signature file digest for Manifest main attributes

If anyone knows the magic to get this working a reply would be greatly
appreciated.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-kafka-SPARK-CLASSPATH-tp7356.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: spark streaming, kafka, SPARK_CLASSPATH

Posted by Andrew Or <an...@databricks.com>.
Hi all,

The short answer is that standalone-cluster mode through spark-submit is
broken (and in fact not officially supported). Please use standalone-client
mode instead.
The long answer is provided here:
http://mail-archives.apache.org/mod_mbox/spark-user/201406.mbox/%3cCAMJOb8m6gF9B3W=P12hi88MexkooN15-1jKVs8pBLmuwh9R2Yw@mail.gmail.com%3e

Andrew


2014-06-19 12:00 GMT-07:00 lannyripple <la...@gmail.com>:

> Gino,
>
> I can confirm that your solution of assembling with spark-streaming-kafka
> but excluding spark-core and spark-streaming has me working with basic
> spark-submit.  As mentioned you must specify the assembly jar in the
> SparkConfig as well as to spark-submit.
>
> When I see the error you are now experiencing I just restart my cluster
> (sbin/stop-all.sh; sleep 6; sbin/start-all.sh).  My thought is a resource
> leak somewhere but I haven't tried to chase it down since restarting is
> nice
> and quick.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-kafka-SPARK-CLASSPATH-tp7356p7941.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>

Re: spark streaming, kafka, SPARK_CLASSPATH

Posted by lannyripple <la...@gmail.com>.
Gino,

I can confirm that your solution of assembling with spark-streaming-kafka
but excluding spark-core and spark-streaming has me working with basic
spark-submit.  As mentioned you must specify the assembly jar in the
SparkConfig as well as to spark-submit.

When I see the error you are now experiencing I just restart my cluster
(sbin/stop-all.sh; sleep 6; sbin/start-all.sh).  My thought is a resource
leak somewhere but I haven't tried to chase it down since restarting is nice
and quick.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-kafka-SPARK-CLASSPATH-tp7356p7941.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: spark streaming, kafka, SPARK_CLASSPATH

Posted by Gino Bustelo <lb...@gmail.com>.
Luis' experience validates what I'm seeing. You have to still set the properties in the SparkConf for the context to work. For example, master URL and jars are specified again in the app. 

Gino B.

> On Jun 17, 2014, at 12:05 PM, Luis Ángel Vicente Sánchez <la...@gmail.com> wrote:
> 
> I have been able to submit a job successfully but I had to config my spark job this way:
> 
>   val sparkConf: SparkConf =
>     new SparkConf()
>       .setAppName("TwitterPopularTags")
>       .setMaster("spark://int-spark-master:7077")
>       .setSparkHome("/opt/spark")
>       .setJars(Seq("/tmp/spark-test-0.1-SNAPSHOT.jar"))
> 
> Now I'm getting this error on my worker:
> 
> 4/06/17 17:03:40 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory
> 
> 
> 
> 2014-06-17 15:38 GMT+01:00 Luis Ángel Vicente Sánchez <la...@gmail.com>:
>> After playing a bit, I have been able to create a fatjar this way:
>> 
>> lazy val rootDependencies = Seq(
>>   "org.apache.spark" %% "spark-core"              % "1.0.0" % "provided",
>>   "org.apache.spark" %% "spark-streaming"         % "1.0.0" % "provided",
>>   "org.apache.spark" %% "spark-streaming-twitter" % "1.0.0" exclude("org.apache.spark", "spark-core_2.10") exclude("org.apache.spark", "spark-streaming_2.10")
>> )
>> 
>> Excluding those transitive dependencies, we can create a fatjar ~400Kb instead of 40Mb.
>> 
>> My problem is not to run the streaming job locally but trying to submit it to standalone cluster using spark-submit, everytime I ran the following command, my workers died:
>> 
>> ~/development/tools/spark/1.0.0/bin/spark-submit \
>> --class "org.apache.spark.examples.streaming.TwitterPopularTags" \
>> --master "spark://int-spark-master:7077" \
>> --deploy-mode "cluster" \
>> file:///tmp/spark-test-0.1-SNAPSHOT.jar
>> 
>> I have copied my fatjar to my master /tmp folder.
>> 
>> 
>> 2014-06-17 10:30 GMT+01:00 Michael Cutler <mi...@tumra.com>:
>> 
>>> Admittedly getting Spark Streaming / Kafka working for the first time can be a bit tricky with the web of dependencies that get pulled in.  I've taken the KafkaWorkCount example from the Spark project and set up a simple standalone SBT project that shows you how to get it working and using spark-submit.
>>> 
>>> https://github.com/cotdp/spark-example-kafka
>>> 
>>> The key trick is in the use of sbt-assembly instead of relying on any of the "add jars" functionality.  You mark "spark-core" and "spark-streaming" as provided, because they are part of the core spark-assembly already running your cluster.  However "spark-streaming-kafka" is not, so you need to package it in your 'fat JAR' while excluding all the mess that causes the build to break.
>>> 
>>> build.sbt:
>>> 
>>> import AssemblyKeys._
>>> 
>>> 
>>> assemblySettings
>>> 
>>> 
>>> name := "spark-example-kafka"
>>> 
>>> 
>>> version := "1.0"
>>> 
>>> 
>>> scalaVersion := "2.10.4"
>>> 
>>> 
>>> 
>>> 
>>> jarName in assembly := "spark-example-kafka_2.10-1.0.jar"
>>> 
>>> 
>>> 
>>> 
>>> assemblyOption in assembly ~= { _.copy(includeScala = false) }
>>> 
>>> 
>>> 
>>> 
>>> libraryDependencies ++= Seq(
>>> 
>>> 
>>> 
>>>   "org.apache.spark" %% "spark-core" % "1.0.0" % "provided",
>>> 
>>> 
>>> 
>>>   "org.apache.spark" %% "spark-streaming" % "1.0.0" % "provided",
>>> 
>>> 
>>> 
>>>   ("org.apache.spark" %% "spark-streaming-kafka" % "1.0.0").
>>> 
>>> 
>>> 
>>>     exclude("commons-beanutils", "commons-beanutils").
>>> 
>>> 
>>> 
>>>     exclude("commons-collections", "commons-collections").
>>> 
>>> 
>>> 
>>>     exclude("com.esotericsoftware.minlog", "minlog")
>>> 
>>> 
>>> 
>>> )
>>> 
>>> 
>>> mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
>>> 
>>> 
>>> 
>>>   {
>>>     case x if x.startsWith("META-INF/ECLIPSEF.RSA") => MergeStrategy.last
>>> 
>>> 
>>> 
>>>     case x if x.startsWith("META-INF/mailcap") => MergeStrategy.last
>>> 
>>> 
>>> 
>>>     case x if x.startsWith("plugin.properties") => MergeStrategy.last
>>> 
>>> 
>>> 
>>>     case x => old(x)
>>> 
>>> 
>>> 
>>>   }
>>> }
>>> 
>>> 
>>> You can see the "exclude()" has to go around the spark-streaming-kafka dependency, and I've used a MergeStrategy to solve the "deduplicate: different file contents found in the following" errors.
>>> 
>>> Build the JAR with sbt assembly and use the scripts in bin/ to run the examples.
>>> 
>>> I'm using this same approach to run my Spark Streaming jobs with spark-submit and have them managed using Mesos/Marathon to handle failures and restarts with long running processes.
>>> 
>>> Good luck!
>>> 
>>> MC
>>> 
>>> 
>>> 
>>> 
>>> 
>>> Michael Cutler
>>> Founder, CTO
>>> 
>>> 	
>>> Mobile: +44 789 990 7847
>>> Email:   michael@tumra.com
>>> Web:     tumra.com
>>> Visit us at our offices in Chiswick Park
>>> Registered in England & Wales, 07916412. VAT No. 130595328
>>> 
>>> 
>>> This email and any files transmitted with it are confidential and may also be privileged. It is intended only for the person to whom it is addressed. If you have received this email in error, please inform the sender immediately. If you are not the intended recipient you must not use, disclose, copy, print, distribute or rely on this email.
>>> 
>>> 
>>>> On 17 June 2014 02:51, Gino Bustelo <lb...@gmail.com> wrote:
>>>> +1 for this issue. Documentation for spark-submit are misleading. Among many issues, the jar support is bad. HTTP urls do not work. This is because spark is using hadoop's FileSystem class. You have to specify the jars twice to get things to work. Once for the DriverWrapper to laid your classes and a 2nd time in the Context to distribute to workers. 
>>>> 
>>>> I would like to see some contrib response to this issue. 
>>>> 
>>>> Gino B.
>>>> 
>>>>> On Jun 16, 2014, at 1:49 PM, Luis Ángel Vicente Sánchez <la...@gmail.com> wrote:
>>>>> 
>>>>> Did you manage to make it work? I'm facing similar problems and this a serious blocker issue. spark-submit seems kind of broken to me if you can use it for spark-streaming.
>>>>> 
>>>>> Regards,
>>>>> 
>>>>> Luis
>>>>> 
>>>>> 
>>>>> 2014-06-11 1:48 GMT+01:00 lannyripple <la...@gmail.com>:
>>>>>> I am using Spark 1.0.0 compiled with Hadoop 1.2.1.
>>>>>> 
>>>>>> I have a toy spark-streaming-kafka program.  It reads from a kafka queue and
>>>>>> does
>>>>>> 
>>>>>>     stream
>>>>>>       .map {case (k, v) => (v, 1)}
>>>>>>       .reduceByKey(_ + _)
>>>>>>       .print()
>>>>>> 
>>>>>> using a 1 second interval on the stream.
>>>>>> 
>>>>>> The docs say to make Spark and Hadoop jars 'provided' but this breaks for
>>>>>> spark-streaming.  Including spark-streaming (and spark-streaming-kafka) as
>>>>>> 'compile' to sweep them into our assembly gives collisions on javax.*
>>>>>> classes.  To work around this I modified
>>>>>> $SPARK_HOME/bin/compute-classpath.sh to include spark-streaming,
>>>>>> spark-streaming-kafka, and zkclient.  (Note that kafka is included as
>>>>>> 'compile' in my project and picked up in the assembly.)
>>>>>> 
>>>>>> I have set up conf/spark-env.sh as needed.  I have copied my assembly to
>>>>>> /tmp/myjar.jar on all spark hosts and to my hdfs /tmp/jars directory.  I am
>>>>>> running spark-submit from my spark master.  I am guided by the information
>>>>>> here https://spark.apache.org/docs/latest/submitting-applications.html
>>>>>> 
>>>>>> Well at this point I was going to detail all the ways spark-submit fails to
>>>>>> follow it's own documentation.  If I do not invoke sparkContext.setJars()
>>>>>> then it just fails to find the driver class.  This is using various
>>>>>> combinations of absolute path, file:, hdfs: (Warning: Skip remote jar)??,
>>>>>> and local: prefixes on the application-jar and --jars arguments.
>>>>>> 
>>>>>> If I invoke sparkContext.setJars() and include my assembly jar I get
>>>>>> further.  At this point I get a failure from
>>>>>> kafka.consumer.ConsumerConnector not being found.  I suspect this is because
>>>>>> spark-streaming-kafka needs the Kafka dependency it but my assembly jar is
>>>>>> too late in the classpath.
>>>>>> 
>>>>>> At this point I try setting spark.files.userClassPathfirst to 'true' but
>>>>>> this causes more things to blow up.
>>>>>> 
>>>>>> I finally found something that works.  Namely setting environment variable
>>>>>> SPARK_CLASSPATH=/tmp/myjar.jar  But silly me, this is deprecated and I'm
>>>>>> helpfully informed to
>>>>>> 
>>>>>>   Please instead use:
>>>>>>    - ./spark-submit with --driver-class-path to augment the driver classpath
>>>>>>    - spark.executor.extraClassPath to augment the executor classpath
>>>>>> 
>>>>>> which when put into a file and introduced with --properties-file does not
>>>>>> work.  (Also tried spark.files.userClassPathFirst here.)  These fail with
>>>>>> the kafka.consumer.ConsumerConnector error.
>>>>>> 
>>>>>> At a guess what's going on is that using SPARK_CLASSPATH I have my assembly
>>>>>> jar in the classpath at SparkSubmit invocation
>>>>>> 
>>>>>>   Spark Command: java -cp
>>>>>> /tmp/myjar.jar::/opt/spark/conf:/opt/spark/lib/spark-assembly-1.0.0-hadoop1.2.1.jar:/opt/spark/lib/spark-streaming_2.10-1.0.0.jar:/opt/spark/lib/spark-streaming-kafka_2.10-1.0.0.jar:/opt/spark/lib/zkclient-0.4.jar
>>>>>> -XX:MaxPermSize=128m -Djava.library.path= -Xms512m -Xmx512m
>>>>>> org.apache.spark.deploy.SparkSubmit --class me.KafkaStreamingWC
>>>>>> /tmp/myjar.jar
>>>>>> 
>>>>>> but using --properties-file then the assembly is not available for
>>>>>> SparkSubmit.
>>>>>> 
>>>>>> I think the root cause is either spark-submit not handling the
>>>>>> spark-streaming libraries so they can be 'provided' or the inclusion of
>>>>>> org.elicpse.jetty.orbit in the streaming libraries which cause
>>>>>> 
>>>>>>   [error] (*:assembly) deduplicate: different file contents found in the
>>>>>> following:
>>>>>>   [error]
>>>>>> /Users/lanny/.ivy2/cache/org.eclipse.jetty.orbit/javax.transaction/orbits/javax.transaction-1.1.1.v201105210645.jar:META-INF/ECLIPSEF.RSA
>>>>>>   [error]
>>>>>> /Users/lanny/.ivy2/cache/org.eclipse.jetty.orbit/javax.servlet/orbits/javax.servlet-3.0.0.v201112011016.jar:META-INF/ECLIPSEF.RSA
>>>>>>   [error]
>>>>>> /Users/lanny/.ivy2/cache/org.eclipse.jetty.orbit/javax.mail.glassfish/orbits/javax.mail.glassfish-1.4.1.v201005082020.jar:META-INF/ECLIPSEF.RSA
>>>>>>   [error]
>>>>>> /Users/lanny/.ivy2/cache/org.eclipse.jetty.orbit/javax.activation/orbits/javax.activation-1.1.0.v201105071233.jar:META-INF/ECLIPSEF.RSA
>>>>>> 
>>>>>> I've tried applying mergeStategy in assembly for my assembly.sbt but then I
>>>>>> get
>>>>>> 
>>>>>>   Invalid signature file digest for Manifest main attributes
>>>>>> 
>>>>>> If anyone knows the magic to get this working a reply would be greatly
>>>>>> appreciated.
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> --
>>>>>> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-kafka-SPARK-CLASSPATH-tp7356.html
>>>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 

Re: spark streaming, kafka, SPARK_CLASSPATH

Posted by Luis Ángel Vicente Sánchez <la...@gmail.com>.
I have been able to submit a job successfully but I had to config my spark
job this way:

  val sparkConf: SparkConf =
    new SparkConf()
      .setAppName("TwitterPopularTags")
      .setMaster("spark://int-spark-master:7077")
      .setSparkHome("/opt/spark")
      .setJars(Seq("/tmp/spark-test-0.1-SNAPSHOT.jar"))

Now I'm getting this error on my worker:

4/06/17 17:03:40 WARN TaskSchedulerImpl: Initial job has not accepted any
resources; check your cluster UI to ensure that workers are registered and
have sufficient memory



2014-06-17 15:38 GMT+01:00 Luis Ángel Vicente Sánchez <
langel.groups@gmail.com>:

> After playing a bit, I have been able to create a fatjar this way:
>
> lazy val rootDependencies = Seq(
>   "org.apache.spark" %% "spark-core"              % "1.0.0" % "provided",
>   "org.apache.spark" %% "spark-streaming"         % "1.0.0" % "provided",
>   "org.apache.spark" %% "spark-streaming-twitter" % "1.0.0"
> exclude("org.apache.spark", "spark-core_2.10") exclude("org.apache.spark",
> "spark-streaming_2.10")
> )
>
> Excluding those transitive dependencies, we can create a fatjar ~400Kb
> instead of 40Mb.
>
> My problem is not to run the streaming job locally but trying to submit it
> to standalone cluster using spark-submit, everytime I ran the following
> command, my workers died:
>
> ~/development/tools/spark/1.0.0/bin/spark-submit \
> --class "org.apache.spark.examples.streaming.TwitterPopularTags" \
> --master "spark://int-spark-master:7077" \
> --deploy-mode "cluster" \
> file:///tmp/spark-test-0.1-SNAPSHOT.jar
>
> I have copied my fatjar to my master /tmp folder.
>
>
> 2014-06-17 10:30 GMT+01:00 Michael Cutler <mi...@tumra.com>:
>
> Admittedly getting Spark Streaming / Kafka working for the first time can
>> be a bit tricky with the web of dependencies that get pulled in.  I've
>> taken the KafkaWorkCount example from the Spark project and set up a simple
>> standalone SBT project that shows you how to get it working and using
>> spark-submit.
>>
>> *https://github.com/cotdp/spark-example-kafka
>> <https://github.com/cotdp/spark-example-kafka>*
>>
>> The key trick is in the use of sbt-assembly instead of relying on any of
>> the "add jars" functionality.  You mark "spark-core" and "spark-streaming"
>> as provided, because they are part of the core spark-assembly already
>> running your cluster.  However "spark-streaming-kafka" is not, so you need
>> to package it in your 'fat JAR' while excluding all the mess that causes
>> the build to break.
>>
>> build.sbt
>> <https://github.com/cotdp/spark-example-kafka/blob/master/build.sbt>:
>>
>> import AssemblyKeys._
>>
>> assemblySettings
>>
>> name := "spark-example-kafka"
>>
>> version := "1.0"
>>
>> scalaVersion := "2.10.4"
>>
>>
>> jarName in assembly := "spark-example-kafka_2.10-1.0.jar"
>>
>>
>> assemblyOption in assembly ~= { _.copy(includeScala = false) }
>>
>>
>> libraryDependencies ++= Seq(
>>
>>   "org.apache.spark" %% "spark-core" % "1.0.0" % "provided",
>>
>>   "org.apache.spark" %% "spark-streaming" % "1.0.0" % "provided",
>>
>>   ("org.apache.spark" %% "spark-streaming-kafka" % "1.0.0").
>>
>>     exclude("commons-beanutils", "commons-beanutils").
>>
>>     exclude("commons-collections", "commons-collections").
>>
>>     exclude("com.esotericsoftware.minlog", "minlog")
>>
>> )
>>
>> mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
>>
>>   {
>>     case x if x.startsWith("META-INF/ECLIPSEF.RSA") => MergeStrategy.last
>>
>>     case x if x.startsWith("META-INF/mailcap") => MergeStrategy.last
>>
>>     case x if x.startsWith("plugin.properties") => MergeStrategy.last
>>
>>     case x => old(x)
>>
>>   }
>> }
>>
>>
>> You can see the "exclude()" has to go around the spark-streaming-kafka dependency,
>> and I've used a MergeStrategy to solve the "deduplicate: different file
>> contents found in the following" errors.
>>
>> Build the JAR with sbt assembly and use the scripts in bin/ to run the
>> examples.
>>
>> I'm using this same approach to run my Spark Streaming jobs with
>> spark-submit and have them managed using Mesos/Marathon
>> <http://mesosphere.io/> to handle failures and restarts with long
>> running processes.
>>
>> Good luck!
>>
>> MC
>>
>>
>>
>>
>>
>>  *Michael Cutler*
>> Founder, CTO
>>
>>
>> * Mobile: +44 789 990 7847 Email:   michael@tumra.com <mi...@tumra.com>
>> Web:     tumra.com
>> <http://tumra.com/?utm_source=signature&utm_medium=email> *
>> *Visit us at our offices in Chiswick Park <http://goo.gl/maps/abBxq>*
>> *Registered in England & Wales, 07916412. VAT No. 130595328*
>>
>>
>> This email and any files transmitted with it are confidential and may
>> also be privileged. It is intended only for the person to whom it is
>> addressed. If you have received this email in error, please inform the
>> sender immediately. If you are not the intended recipient you must not
>> use, disclose, copy, print, distribute or rely on this email.
>>
>>
>> On 17 June 2014 02:51, Gino Bustelo <lb...@gmail.com> wrote:
>>
>>> +1 for this issue. Documentation for spark-submit are misleading. Among
>>> many issues, the jar support is bad. HTTP urls do not work. This is because
>>> spark is using hadoop's FileSystem class. You have to specify the jars
>>> twice to get things to work. Once for the DriverWrapper to laid your
>>> classes and a 2nd time in the Context to distribute to workers.
>>>
>>> I would like to see some contrib response to this issue.
>>>
>>> Gino B.
>>>
>>> On Jun 16, 2014, at 1:49 PM, Luis Ángel Vicente Sánchez <
>>> langel.groups@gmail.com> wrote:
>>>
>>> Did you manage to make it work? I'm facing similar problems and this a
>>> serious blocker issue. spark-submit seems kind of broken to me if you can
>>> use it for spark-streaming.
>>>
>>> Regards,
>>>
>>> Luis
>>>
>>>
>>> 2014-06-11 1:48 GMT+01:00 lannyripple <la...@gmail.com>:
>>>
>>>> I am using Spark 1.0.0 compiled with Hadoop 1.2.1.
>>>>
>>>> I have a toy spark-streaming-kafka program.  It reads from a kafka
>>>> queue and
>>>> does
>>>>
>>>>     stream
>>>>       .map {case (k, v) => (v, 1)}
>>>>       .reduceByKey(_ + _)
>>>>       .print()
>>>>
>>>> using a 1 second interval on the stream.
>>>>
>>>> The docs say to make Spark and Hadoop jars 'provided' but this breaks
>>>> for
>>>> spark-streaming.  Including spark-streaming (and spark-streaming-kafka)
>>>> as
>>>> 'compile' to sweep them into our assembly gives collisions on javax.*
>>>> classes.  To work around this I modified
>>>> $SPARK_HOME/bin/compute-classpath.sh to include spark-streaming,
>>>> spark-streaming-kafka, and zkclient.  (Note that kafka is included as
>>>> 'compile' in my project and picked up in the assembly.)
>>>>
>>>> I have set up conf/spark-env.sh as needed.  I have copied my assembly to
>>>> /tmp/myjar.jar on all spark hosts and to my hdfs /tmp/jars directory.
>>>>  I am
>>>> running spark-submit from my spark master.  I am guided by the
>>>> information
>>>> here https://spark.apache.org/docs/latest/submitting-applications.html
>>>>
>>>> Well at this point I was going to detail all the ways spark-submit
>>>> fails to
>>>> follow it's own documentation.  If I do not invoke
>>>> sparkContext.setJars()
>>>> then it just fails to find the driver class.  This is using various
>>>> combinations of absolute path, file:, hdfs: (Warning: Skip remote
>>>> jar)??,
>>>> and local: prefixes on the application-jar and --jars arguments.
>>>>
>>>> If I invoke sparkContext.setJars() and include my assembly jar I get
>>>> further.  At this point I get a failure from
>>>> kafka.consumer.ConsumerConnector not being found.  I suspect this is
>>>> because
>>>> spark-streaming-kafka needs the Kafka dependency it but my assembly jar
>>>> is
>>>> too late in the classpath.
>>>>
>>>> At this point I try setting spark.files.userClassPathfirst to 'true' but
>>>> this causes more things to blow up.
>>>>
>>>> I finally found something that works.  Namely setting environment
>>>> variable
>>>> SPARK_CLASSPATH=/tmp/myjar.jar  But silly me, this is deprecated and I'm
>>>> helpfully informed to
>>>>
>>>>   Please instead use:
>>>>    - ./spark-submit with --driver-class-path to augment the driver
>>>> classpath
>>>>    - spark.executor.extraClassPath to augment the executor classpath
>>>>
>>>> which when put into a file and introduced with --properties-file does
>>>> not
>>>> work.  (Also tried spark.files.userClassPathFirst here.)  These fail
>>>> with
>>>> the kafka.consumer.ConsumerConnector error.
>>>>
>>>> At a guess what's going on is that using SPARK_CLASSPATH I have my
>>>> assembly
>>>> jar in the classpath at SparkSubmit invocation
>>>>
>>>>   Spark Command: java -cp
>>>>
>>>> /tmp/myjar.jar::/opt/spark/conf:/opt/spark/lib/spark-assembly-1.0.0-hadoop1.2.1.jar:/opt/spark/lib/spark-streaming_2.10-1.0.0.jar:/opt/spark/lib/spark-streaming-kafka_2.10-1.0.0.jar:/opt/spark/lib/zkclient-0.4.jar
>>>> -XX:MaxPermSize=128m -Djava.library.path= -Xms512m -Xmx512m
>>>> org.apache.spark.deploy.SparkSubmit --class me.KafkaStreamingWC
>>>> /tmp/myjar.jar
>>>>
>>>> but using --properties-file then the assembly is not available for
>>>> SparkSubmit.
>>>>
>>>> I think the root cause is either spark-submit not handling the
>>>> spark-streaming libraries so they can be 'provided' or the inclusion of
>>>> org.elicpse.jetty.orbit in the streaming libraries which cause
>>>>
>>>>   [error] (*:assembly) deduplicate: different file contents found in the
>>>> following:
>>>>   [error]
>>>>
>>>> /Users/lanny/.ivy2/cache/org.eclipse.jetty.orbit/javax.transaction/orbits/javax.transaction-1.1.1.v201105210645.jar:META-INF/ECLIPSEF.RSA
>>>>   [error]
>>>>
>>>> /Users/lanny/.ivy2/cache/org.eclipse.jetty.orbit/javax.servlet/orbits/javax.servlet-3.0.0.v201112011016.jar:META-INF/ECLIPSEF.RSA
>>>>   [error]
>>>>
>>>> /Users/lanny/.ivy2/cache/org.eclipse.jetty.orbit/javax.mail.glassfish/orbits/javax.mail.glassfish-1.4.1.v201005082020.jar:META-INF/ECLIPSEF.RSA
>>>>   [error]
>>>>
>>>> /Users/lanny/.ivy2/cache/org.eclipse.jetty.orbit/javax.activation/orbits/javax.activation-1.1.0.v201105071233.jar:META-INF/ECLIPSEF.RSA
>>>>
>>>> I've tried applying mergeStategy in assembly for my assembly.sbt but
>>>> then I
>>>> get
>>>>
>>>>   Invalid signature file digest for Manifest main attributes
>>>>
>>>> If anyone knows the magic to get this working a reply would be greatly
>>>> appreciated.
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-kafka-SPARK-CLASSPATH-tp7356.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com
>>>> .
>>>>
>>>
>>>
>>
>

Re: spark streaming, kafka, SPARK_CLASSPATH

Posted by Luis Ángel Vicente Sánchez <la...@gmail.com>.
After playing a bit, I have been able to create a fatjar this way:

lazy val rootDependencies = Seq(
  "org.apache.spark" %% "spark-core"              % "1.0.0" % "provided",
  "org.apache.spark" %% "spark-streaming"         % "1.0.0" % "provided",
  "org.apache.spark" %% "spark-streaming-twitter" % "1.0.0"
exclude("org.apache.spark", "spark-core_2.10") exclude("org.apache.spark",
"spark-streaming_2.10")
)

Excluding those transitive dependencies, we can create a fatjar ~400Kb
instead of 40Mb.

My problem is not to run the streaming job locally but trying to submit it
to standalone cluster using spark-submit, everytime I ran the following
command, my workers died:

~/development/tools/spark/1.0.0/bin/spark-submit \
--class "org.apache.spark.examples.streaming.TwitterPopularTags" \
--master "spark://int-spark-master:7077" \
--deploy-mode "cluster" \
file:///tmp/spark-test-0.1-SNAPSHOT.jar

I have copied my fatjar to my master /tmp folder.


2014-06-17 10:30 GMT+01:00 Michael Cutler <mi...@tumra.com>:

> Admittedly getting Spark Streaming / Kafka working for the first time can
> be a bit tricky with the web of dependencies that get pulled in.  I've
> taken the KafkaWorkCount example from the Spark project and set up a simple
> standalone SBT project that shows you how to get it working and using
> spark-submit.
>
> *https://github.com/cotdp/spark-example-kafka
> <https://github.com/cotdp/spark-example-kafka>*
>
> The key trick is in the use of sbt-assembly instead of relying on any of
> the "add jars" functionality.  You mark "spark-core" and "spark-streaming"
> as provided, because they are part of the core spark-assembly already
> running your cluster.  However "spark-streaming-kafka" is not, so you need
> to package it in your 'fat JAR' while excluding all the mess that causes
> the build to break.
>
> build.sbt
> <https://github.com/cotdp/spark-example-kafka/blob/master/build.sbt>:
>
> import AssemblyKeys._
>
> assemblySettings
>
> name := "spark-example-kafka"
>
> version := "1.0"
>
> scalaVersion := "2.10.4"
>
> jarName in assembly := "spark-example-kafka_2.10-1.0.jar"
>
> assemblyOption in assembly ~= { _.copy(includeScala = false) }
>
> libraryDependencies ++= Seq(
>   "org.apache.spark" %% "spark-core" % "1.0.0" % "provided",
>   "org.apache.spark" %% "spark-streaming" % "1.0.0" % "provided",
>   ("org.apache.spark" %% "spark-streaming-kafka" % "1.0.0").
>     exclude("commons-beanutils", "commons-beanutils").
>     exclude("commons-collections", "commons-collections").
>     exclude("com.esotericsoftware.minlog", "minlog")
> )
>
> mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
>   {
>     case x if x.startsWith("META-INF/ECLIPSEF.RSA") => MergeStrategy.last
>     case x if x.startsWith("META-INF/mailcap") => MergeStrategy.last
>     case x if x.startsWith("plugin.properties") => MergeStrategy.last
>     case x => old(x)
>   }
> }
>
>
> You can see the "exclude()" has to go around the spark-streaming-kafka dependency,
> and I've used a MergeStrategy to solve the "deduplicate: different file
> contents found in the following" errors.
>
> Build the JAR with sbt assembly and use the scripts in bin/ to run the
> examples.
>
> I'm using this same approach to run my Spark Streaming jobs with
> spark-submit and have them managed using Mesos/Marathon
> <http://mesosphere.io/> to handle failures and restarts with long running
> processes.
>
> Good luck!
>
> MC
>
>
>
>
>
>  *Michael Cutler*
> Founder, CTO
>
>
> * Mobile: +44 789 990 7847 Email:   michael@tumra.com <mi...@tumra.com>
> Web:     tumra.com
> <http://tumra.com/?utm_source=signature&utm_medium=email> *
> *Visit us at our offices in Chiswick Park <http://goo.gl/maps/abBxq>*
> *Registered in England & Wales, 07916412. VAT No. 130595328*
>
>
> This email and any files transmitted with it are confidential and may also
> be privileged. It is intended only for the person to whom it is addressed.
> If you have received this email in error, please inform the sender immediately.
> If you are not the intended recipient you must not use, disclose, copy,
> print, distribute or rely on this email.
>
>
> On 17 June 2014 02:51, Gino Bustelo <lb...@gmail.com> wrote:
>
>> +1 for this issue. Documentation for spark-submit are misleading. Among
>> many issues, the jar support is bad. HTTP urls do not work. This is because
>> spark is using hadoop's FileSystem class. You have to specify the jars
>> twice to get things to work. Once for the DriverWrapper to laid your
>> classes and a 2nd time in the Context to distribute to workers.
>>
>> I would like to see some contrib response to this issue.
>>
>> Gino B.
>>
>> On Jun 16, 2014, at 1:49 PM, Luis Ángel Vicente Sánchez <
>> langel.groups@gmail.com> wrote:
>>
>> Did you manage to make it work? I'm facing similar problems and this a
>> serious blocker issue. spark-submit seems kind of broken to me if you can
>> use it for spark-streaming.
>>
>> Regards,
>>
>> Luis
>>
>>
>> 2014-06-11 1:48 GMT+01:00 lannyripple <la...@gmail.com>:
>>
>>> I am using Spark 1.0.0 compiled with Hadoop 1.2.1.
>>>
>>> I have a toy spark-streaming-kafka program.  It reads from a kafka queue
>>> and
>>> does
>>>
>>>     stream
>>>       .map {case (k, v) => (v, 1)}
>>>       .reduceByKey(_ + _)
>>>       .print()
>>>
>>> using a 1 second interval on the stream.
>>>
>>> The docs say to make Spark and Hadoop jars 'provided' but this breaks for
>>> spark-streaming.  Including spark-streaming (and spark-streaming-kafka)
>>> as
>>> 'compile' to sweep them into our assembly gives collisions on javax.*
>>> classes.  To work around this I modified
>>> $SPARK_HOME/bin/compute-classpath.sh to include spark-streaming,
>>> spark-streaming-kafka, and zkclient.  (Note that kafka is included as
>>> 'compile' in my project and picked up in the assembly.)
>>>
>>> I have set up conf/spark-env.sh as needed.  I have copied my assembly to
>>> /tmp/myjar.jar on all spark hosts and to my hdfs /tmp/jars directory.  I
>>> am
>>> running spark-submit from my spark master.  I am guided by the
>>> information
>>> here https://spark.apache.org/docs/latest/submitting-applications.html
>>>
>>> Well at this point I was going to detail all the ways spark-submit fails
>>> to
>>> follow it's own documentation.  If I do not invoke sparkContext.setJars()
>>> then it just fails to find the driver class.  This is using various
>>> combinations of absolute path, file:, hdfs: (Warning: Skip remote jar)??,
>>> and local: prefixes on the application-jar and --jars arguments.
>>>
>>> If I invoke sparkContext.setJars() and include my assembly jar I get
>>> further.  At this point I get a failure from
>>> kafka.consumer.ConsumerConnector not being found.  I suspect this is
>>> because
>>> spark-streaming-kafka needs the Kafka dependency it but my assembly jar
>>> is
>>> too late in the classpath.
>>>
>>> At this point I try setting spark.files.userClassPathfirst to 'true' but
>>> this causes more things to blow up.
>>>
>>> I finally found something that works.  Namely setting environment
>>> variable
>>> SPARK_CLASSPATH=/tmp/myjar.jar  But silly me, this is deprecated and I'm
>>> helpfully informed to
>>>
>>>   Please instead use:
>>>    - ./spark-submit with --driver-class-path to augment the driver
>>> classpath
>>>    - spark.executor.extraClassPath to augment the executor classpath
>>>
>>> which when put into a file and introduced with --properties-file does not
>>> work.  (Also tried spark.files.userClassPathFirst here.)  These fail with
>>> the kafka.consumer.ConsumerConnector error.
>>>
>>> At a guess what's going on is that using SPARK_CLASSPATH I have my
>>> assembly
>>> jar in the classpath at SparkSubmit invocation
>>>
>>>   Spark Command: java -cp
>>>
>>> /tmp/myjar.jar::/opt/spark/conf:/opt/spark/lib/spark-assembly-1.0.0-hadoop1.2.1.jar:/opt/spark/lib/spark-streaming_2.10-1.0.0.jar:/opt/spark/lib/spark-streaming-kafka_2.10-1.0.0.jar:/opt/spark/lib/zkclient-0.4.jar
>>> -XX:MaxPermSize=128m -Djava.library.path= -Xms512m -Xmx512m
>>> org.apache.spark.deploy.SparkSubmit --class me.KafkaStreamingWC
>>> /tmp/myjar.jar
>>>
>>> but using --properties-file then the assembly is not available for
>>> SparkSubmit.
>>>
>>> I think the root cause is either spark-submit not handling the
>>> spark-streaming libraries so they can be 'provided' or the inclusion of
>>> org.elicpse.jetty.orbit in the streaming libraries which cause
>>>
>>>   [error] (*:assembly) deduplicate: different file contents found in the
>>> following:
>>>   [error]
>>>
>>> /Users/lanny/.ivy2/cache/org.eclipse.jetty.orbit/javax.transaction/orbits/javax.transaction-1.1.1.v201105210645.jar:META-INF/ECLIPSEF.RSA
>>>   [error]
>>>
>>> /Users/lanny/.ivy2/cache/org.eclipse.jetty.orbit/javax.servlet/orbits/javax.servlet-3.0.0.v201112011016.jar:META-INF/ECLIPSEF.RSA
>>>   [error]
>>>
>>> /Users/lanny/.ivy2/cache/org.eclipse.jetty.orbit/javax.mail.glassfish/orbits/javax.mail.glassfish-1.4.1.v201005082020.jar:META-INF/ECLIPSEF.RSA
>>>   [error]
>>>
>>> /Users/lanny/.ivy2/cache/org.eclipse.jetty.orbit/javax.activation/orbits/javax.activation-1.1.0.v201105071233.jar:META-INF/ECLIPSEF.RSA
>>>
>>> I've tried applying mergeStategy in assembly for my assembly.sbt but
>>> then I
>>> get
>>>
>>>   Invalid signature file digest for Manifest main attributes
>>>
>>> If anyone knows the magic to get this working a reply would be greatly
>>> appreciated.
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-kafka-SPARK-CLASSPATH-tp7356.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>
>>
>

Re: spark streaming, kafka, SPARK_CLASSPATH

Posted by Michael Cutler <mi...@tumra.com>.
Admittedly getting Spark Streaming / Kafka working for the first time can
be a bit tricky with the web of dependencies that get pulled in.  I've
taken the KafkaWorkCount example from the Spark project and set up a simple
standalone SBT project that shows you how to get it working and using
spark-submit.

*https://github.com/cotdp/spark-example-kafka
<https://github.com/cotdp/spark-example-kafka>*

The key trick is in the use of sbt-assembly instead of relying on any of
the "add jars" functionality.  You mark "spark-core" and "spark-streaming"
as provided, because they are part of the core spark-assembly already
running your cluster.  However "spark-streaming-kafka" is not, so you need
to package it in your 'fat JAR' while excluding all the mess that causes
the build to break.

build.sbt
<https://github.com/cotdp/spark-example-kafka/blob/master/build.sbt>:

import AssemblyKeys._

assemblySettings

name := "spark-example-kafka"

version := "1.0"

scalaVersion := "2.10.4"

jarName in assembly := "spark-example-kafka_2.10-1.0.jar"

assemblyOption in assembly ~= { _.copy(includeScala = false) }

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.0.0" % "provided",
  "org.apache.spark" %% "spark-streaming" % "1.0.0" % "provided",
  ("org.apache.spark" %% "spark-streaming-kafka" % "1.0.0").
    exclude("commons-beanutils", "commons-beanutils").
    exclude("commons-collections", "commons-collections").
    exclude("com.esotericsoftware.minlog", "minlog")
)

mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
  {
    case x if x.startsWith("META-INF/ECLIPSEF.RSA") => MergeStrategy.last
    case x if x.startsWith("META-INF/mailcap") => MergeStrategy.last
    case x if x.startsWith("plugin.properties") => MergeStrategy.last
    case x => old(x)
  }
}


You can see the "exclude()" has to go around the spark-streaming-kafka
dependency,
and I've used a MergeStrategy to solve the "deduplicate: different file
contents found in the following" errors.

Build the JAR with sbt assembly and use the scripts in bin/ to run the
examples.

I'm using this same approach to run my Spark Streaming jobs with
spark-submit and have them managed using Mesos/Marathon
<http://mesosphere.io/> to handle failures and restarts with long running
processes.

Good luck!

MC





*Michael Cutler*
Founder, CTO


*Mobile: +44 789 990 7847Email:   michael@tumra.com <mi...@tumra.com>Web:
    tumra.com <http://tumra.com/?utm_source=signature&utm_medium=email>*
*Visit us at our offices in Chiswick Park <http://goo.gl/maps/abBxq>*
*Registered in England & Wales, 07916412. VAT No. 130595328*


This email and any files transmitted with it are confidential and may also
be privileged. It is intended only for the person to whom it is addressed.
If you have received this email in error, please inform the sender immediately.
If you are not the intended recipient you must not use, disclose, copy,
print, distribute or rely on this email.


On 17 June 2014 02:51, Gino Bustelo <lb...@gmail.com> wrote:

> +1 for this issue. Documentation for spark-submit are misleading. Among
> many issues, the jar support is bad. HTTP urls do not work. This is because
> spark is using hadoop's FileSystem class. You have to specify the jars
> twice to get things to work. Once for the DriverWrapper to laid your
> classes and a 2nd time in the Context to distribute to workers.
>
> I would like to see some contrib response to this issue.
>
> Gino B.
>
> On Jun 16, 2014, at 1:49 PM, Luis Ángel Vicente Sánchez <
> langel.groups@gmail.com> wrote:
>
> Did you manage to make it work? I'm facing similar problems and this a
> serious blocker issue. spark-submit seems kind of broken to me if you can
> use it for spark-streaming.
>
> Regards,
>
> Luis
>
>
> 2014-06-11 1:48 GMT+01:00 lannyripple <la...@gmail.com>:
>
>> I am using Spark 1.0.0 compiled with Hadoop 1.2.1.
>>
>> I have a toy spark-streaming-kafka program.  It reads from a kafka queue
>> and
>> does
>>
>>     stream
>>       .map {case (k, v) => (v, 1)}
>>       .reduceByKey(_ + _)
>>       .print()
>>
>> using a 1 second interval on the stream.
>>
>> The docs say to make Spark and Hadoop jars 'provided' but this breaks for
>> spark-streaming.  Including spark-streaming (and spark-streaming-kafka) as
>> 'compile' to sweep them into our assembly gives collisions on javax.*
>> classes.  To work around this I modified
>> $SPARK_HOME/bin/compute-classpath.sh to include spark-streaming,
>> spark-streaming-kafka, and zkclient.  (Note that kafka is included as
>> 'compile' in my project and picked up in the assembly.)
>>
>> I have set up conf/spark-env.sh as needed.  I have copied my assembly to
>> /tmp/myjar.jar on all spark hosts and to my hdfs /tmp/jars directory.  I
>> am
>> running spark-submit from my spark master.  I am guided by the information
>> here https://spark.apache.org/docs/latest/submitting-applications.html
>>
>> Well at this point I was going to detail all the ways spark-submit fails
>> to
>> follow it's own documentation.  If I do not invoke sparkContext.setJars()
>> then it just fails to find the driver class.  This is using various
>> combinations of absolute path, file:, hdfs: (Warning: Skip remote jar)??,
>> and local: prefixes on the application-jar and --jars arguments.
>>
>> If I invoke sparkContext.setJars() and include my assembly jar I get
>> further.  At this point I get a failure from
>> kafka.consumer.ConsumerConnector not being found.  I suspect this is
>> because
>> spark-streaming-kafka needs the Kafka dependency it but my assembly jar is
>> too late in the classpath.
>>
>> At this point I try setting spark.files.userClassPathfirst to 'true' but
>> this causes more things to blow up.
>>
>> I finally found something that works.  Namely setting environment variable
>> SPARK_CLASSPATH=/tmp/myjar.jar  But silly me, this is deprecated and I'm
>> helpfully informed to
>>
>>   Please instead use:
>>    - ./spark-submit with --driver-class-path to augment the driver
>> classpath
>>    - spark.executor.extraClassPath to augment the executor classpath
>>
>> which when put into a file and introduced with --properties-file does not
>> work.  (Also tried spark.files.userClassPathFirst here.)  These fail with
>> the kafka.consumer.ConsumerConnector error.
>>
>> At a guess what's going on is that using SPARK_CLASSPATH I have my
>> assembly
>> jar in the classpath at SparkSubmit invocation
>>
>>   Spark Command: java -cp
>>
>> /tmp/myjar.jar::/opt/spark/conf:/opt/spark/lib/spark-assembly-1.0.0-hadoop1.2.1.jar:/opt/spark/lib/spark-streaming_2.10-1.0.0.jar:/opt/spark/lib/spark-streaming-kafka_2.10-1.0.0.jar:/opt/spark/lib/zkclient-0.4.jar
>> -XX:MaxPermSize=128m -Djava.library.path= -Xms512m -Xmx512m
>> org.apache.spark.deploy.SparkSubmit --class me.KafkaStreamingWC
>> /tmp/myjar.jar
>>
>> but using --properties-file then the assembly is not available for
>> SparkSubmit.
>>
>> I think the root cause is either spark-submit not handling the
>> spark-streaming libraries so they can be 'provided' or the inclusion of
>> org.elicpse.jetty.orbit in the streaming libraries which cause
>>
>>   [error] (*:assembly) deduplicate: different file contents found in the
>> following:
>>   [error]
>>
>> /Users/lanny/.ivy2/cache/org.eclipse.jetty.orbit/javax.transaction/orbits/javax.transaction-1.1.1.v201105210645.jar:META-INF/ECLIPSEF.RSA
>>   [error]
>>
>> /Users/lanny/.ivy2/cache/org.eclipse.jetty.orbit/javax.servlet/orbits/javax.servlet-3.0.0.v201112011016.jar:META-INF/ECLIPSEF.RSA
>>   [error]
>>
>> /Users/lanny/.ivy2/cache/org.eclipse.jetty.orbit/javax.mail.glassfish/orbits/javax.mail.glassfish-1.4.1.v201005082020.jar:META-INF/ECLIPSEF.RSA
>>   [error]
>>
>> /Users/lanny/.ivy2/cache/org.eclipse.jetty.orbit/javax.activation/orbits/javax.activation-1.1.0.v201105071233.jar:META-INF/ECLIPSEF.RSA
>>
>> I've tried applying mergeStategy in assembly for my assembly.sbt but then
>> I
>> get
>>
>>   Invalid signature file digest for Manifest main attributes
>>
>> If anyone knows the magic to get this working a reply would be greatly
>> appreciated.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-kafka-SPARK-CLASSPATH-tp7356.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>
>

Re: spark streaming, kafka, SPARK_CLASSPATH

Posted by Gino Bustelo <lb...@gmail.com>.
+1 for this issue. Documentation for spark-submit are misleading. Among many issues, the jar support is bad. HTTP urls do not work. This is because spark is using hadoop's FileSystem class. You have to specify the jars twice to get things to work. Once for the DriverWrapper to laid your classes and a 2nd time in the Context to distribute to workers. 

I would like to see some contrib response to this issue. 

Gino B.

> On Jun 16, 2014, at 1:49 PM, Luis Ángel Vicente Sánchez <la...@gmail.com> wrote:
> 
> Did you manage to make it work? I'm facing similar problems and this a serious blocker issue. spark-submit seems kind of broken to me if you can use it for spark-streaming.
> 
> Regards,
> 
> Luis
> 
> 
> 2014-06-11 1:48 GMT+01:00 lannyripple <la...@gmail.com>:
>> I am using Spark 1.0.0 compiled with Hadoop 1.2.1.
>> 
>> I have a toy spark-streaming-kafka program.  It reads from a kafka queue and
>> does
>> 
>>     stream
>>       .map {case (k, v) => (v, 1)}
>>       .reduceByKey(_ + _)
>>       .print()
>> 
>> using a 1 second interval on the stream.
>> 
>> The docs say to make Spark and Hadoop jars 'provided' but this breaks for
>> spark-streaming.  Including spark-streaming (and spark-streaming-kafka) as
>> 'compile' to sweep them into our assembly gives collisions on javax.*
>> classes.  To work around this I modified
>> $SPARK_HOME/bin/compute-classpath.sh to include spark-streaming,
>> spark-streaming-kafka, and zkclient.  (Note that kafka is included as
>> 'compile' in my project and picked up in the assembly.)
>> 
>> I have set up conf/spark-env.sh as needed.  I have copied my assembly to
>> /tmp/myjar.jar on all spark hosts and to my hdfs /tmp/jars directory.  I am
>> running spark-submit from my spark master.  I am guided by the information
>> here https://spark.apache.org/docs/latest/submitting-applications.html
>> 
>> Well at this point I was going to detail all the ways spark-submit fails to
>> follow it's own documentation.  If I do not invoke sparkContext.setJars()
>> then it just fails to find the driver class.  This is using various
>> combinations of absolute path, file:, hdfs: (Warning: Skip remote jar)??,
>> and local: prefixes on the application-jar and --jars arguments.
>> 
>> If I invoke sparkContext.setJars() and include my assembly jar I get
>> further.  At this point I get a failure from
>> kafka.consumer.ConsumerConnector not being found.  I suspect this is because
>> spark-streaming-kafka needs the Kafka dependency it but my assembly jar is
>> too late in the classpath.
>> 
>> At this point I try setting spark.files.userClassPathfirst to 'true' but
>> this causes more things to blow up.
>> 
>> I finally found something that works.  Namely setting environment variable
>> SPARK_CLASSPATH=/tmp/myjar.jar  But silly me, this is deprecated and I'm
>> helpfully informed to
>> 
>>   Please instead use:
>>    - ./spark-submit with --driver-class-path to augment the driver classpath
>>    - spark.executor.extraClassPath to augment the executor classpath
>> 
>> which when put into a file and introduced with --properties-file does not
>> work.  (Also tried spark.files.userClassPathFirst here.)  These fail with
>> the kafka.consumer.ConsumerConnector error.
>> 
>> At a guess what's going on is that using SPARK_CLASSPATH I have my assembly
>> jar in the classpath at SparkSubmit invocation
>> 
>>   Spark Command: java -cp
>> /tmp/myjar.jar::/opt/spark/conf:/opt/spark/lib/spark-assembly-1.0.0-hadoop1.2.1.jar:/opt/spark/lib/spark-streaming_2.10-1.0.0.jar:/opt/spark/lib/spark-streaming-kafka_2.10-1.0.0.jar:/opt/spark/lib/zkclient-0.4.jar
>> -XX:MaxPermSize=128m -Djava.library.path= -Xms512m -Xmx512m
>> org.apache.spark.deploy.SparkSubmit --class me.KafkaStreamingWC
>> /tmp/myjar.jar
>> 
>> but using --properties-file then the assembly is not available for
>> SparkSubmit.
>> 
>> I think the root cause is either spark-submit not handling the
>> spark-streaming libraries so they can be 'provided' or the inclusion of
>> org.elicpse.jetty.orbit in the streaming libraries which cause
>> 
>>   [error] (*:assembly) deduplicate: different file contents found in the
>> following:
>>   [error]
>> /Users/lanny/.ivy2/cache/org.eclipse.jetty.orbit/javax.transaction/orbits/javax.transaction-1.1.1.v201105210645.jar:META-INF/ECLIPSEF.RSA
>>   [error]
>> /Users/lanny/.ivy2/cache/org.eclipse.jetty.orbit/javax.servlet/orbits/javax.servlet-3.0.0.v201112011016.jar:META-INF/ECLIPSEF.RSA
>>   [error]
>> /Users/lanny/.ivy2/cache/org.eclipse.jetty.orbit/javax.mail.glassfish/orbits/javax.mail.glassfish-1.4.1.v201005082020.jar:META-INF/ECLIPSEF.RSA
>>   [error]
>> /Users/lanny/.ivy2/cache/org.eclipse.jetty.orbit/javax.activation/orbits/javax.activation-1.1.0.v201105071233.jar:META-INF/ECLIPSEF.RSA
>> 
>> I've tried applying mergeStategy in assembly for my assembly.sbt but then I
>> get
>> 
>>   Invalid signature file digest for Manifest main attributes
>> 
>> If anyone knows the magic to get this working a reply would be greatly
>> appreciated.
>> 
>> 
>> 
>> --
>> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-kafka-SPARK-CLASSPATH-tp7356.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 

Re: spark streaming, kafka, SPARK_CLASSPATH

Posted by Luis Ángel Vicente Sánchez <la...@gmail.com>.
Did you manage to make it work? I'm facing similar problems and this a
serious blocker issue. spark-submit seems kind of broken to me if you can
use it for spark-streaming.

Regards,

Luis


2014-06-11 1:48 GMT+01:00 lannyripple <la...@gmail.com>:

> I am using Spark 1.0.0 compiled with Hadoop 1.2.1.
>
> I have a toy spark-streaming-kafka program.  It reads from a kafka queue
> and
> does
>
>     stream
>       .map {case (k, v) => (v, 1)}
>       .reduceByKey(_ + _)
>       .print()
>
> using a 1 second interval on the stream.
>
> The docs say to make Spark and Hadoop jars 'provided' but this breaks for
> spark-streaming.  Including spark-streaming (and spark-streaming-kafka) as
> 'compile' to sweep them into our assembly gives collisions on javax.*
> classes.  To work around this I modified
> $SPARK_HOME/bin/compute-classpath.sh to include spark-streaming,
> spark-streaming-kafka, and zkclient.  (Note that kafka is included as
> 'compile' in my project and picked up in the assembly.)
>
> I have set up conf/spark-env.sh as needed.  I have copied my assembly to
> /tmp/myjar.jar on all spark hosts and to my hdfs /tmp/jars directory.  I am
> running spark-submit from my spark master.  I am guided by the information
> here https://spark.apache.org/docs/latest/submitting-applications.html
>
> Well at this point I was going to detail all the ways spark-submit fails to
> follow it's own documentation.  If I do not invoke sparkContext.setJars()
> then it just fails to find the driver class.  This is using various
> combinations of absolute path, file:, hdfs: (Warning: Skip remote jar)??,
> and local: prefixes on the application-jar and --jars arguments.
>
> If I invoke sparkContext.setJars() and include my assembly jar I get
> further.  At this point I get a failure from
> kafka.consumer.ConsumerConnector not being found.  I suspect this is
> because
> spark-streaming-kafka needs the Kafka dependency it but my assembly jar is
> too late in the classpath.
>
> At this point I try setting spark.files.userClassPathfirst to 'true' but
> this causes more things to blow up.
>
> I finally found something that works.  Namely setting environment variable
> SPARK_CLASSPATH=/tmp/myjar.jar  But silly me, this is deprecated and I'm
> helpfully informed to
>
>   Please instead use:
>    - ./spark-submit with --driver-class-path to augment the driver
> classpath
>    - spark.executor.extraClassPath to augment the executor classpath
>
> which when put into a file and introduced with --properties-file does not
> work.  (Also tried spark.files.userClassPathFirst here.)  These fail with
> the kafka.consumer.ConsumerConnector error.
>
> At a guess what's going on is that using SPARK_CLASSPATH I have my assembly
> jar in the classpath at SparkSubmit invocation
>
>   Spark Command: java -cp
>
> /tmp/myjar.jar::/opt/spark/conf:/opt/spark/lib/spark-assembly-1.0.0-hadoop1.2.1.jar:/opt/spark/lib/spark-streaming_2.10-1.0.0.jar:/opt/spark/lib/spark-streaming-kafka_2.10-1.0.0.jar:/opt/spark/lib/zkclient-0.4.jar
> -XX:MaxPermSize=128m -Djava.library.path= -Xms512m -Xmx512m
> org.apache.spark.deploy.SparkSubmit --class me.KafkaStreamingWC
> /tmp/myjar.jar
>
> but using --properties-file then the assembly is not available for
> SparkSubmit.
>
> I think the root cause is either spark-submit not handling the
> spark-streaming libraries so they can be 'provided' or the inclusion of
> org.elicpse.jetty.orbit in the streaming libraries which cause
>
>   [error] (*:assembly) deduplicate: different file contents found in the
> following:
>   [error]
>
> /Users/lanny/.ivy2/cache/org.eclipse.jetty.orbit/javax.transaction/orbits/javax.transaction-1.1.1.v201105210645.jar:META-INF/ECLIPSEF.RSA
>   [error]
>
> /Users/lanny/.ivy2/cache/org.eclipse.jetty.orbit/javax.servlet/orbits/javax.servlet-3.0.0.v201112011016.jar:META-INF/ECLIPSEF.RSA
>   [error]
>
> /Users/lanny/.ivy2/cache/org.eclipse.jetty.orbit/javax.mail.glassfish/orbits/javax.mail.glassfish-1.4.1.v201005082020.jar:META-INF/ECLIPSEF.RSA
>   [error]
>
> /Users/lanny/.ivy2/cache/org.eclipse.jetty.orbit/javax.activation/orbits/javax.activation-1.1.0.v201105071233.jar:META-INF/ECLIPSEF.RSA
>
> I've tried applying mergeStategy in assembly for my assembly.sbt but then I
> get
>
>   Invalid signature file digest for Manifest main attributes
>
> If anyone knows the magic to get this working a reply would be greatly
> appreciated.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-kafka-SPARK-CLASSPATH-tp7356.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>