You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by anna stax <an...@gmail.com> on 2019/03/18 21:06:46 UTC

Writing the contents of spark dataframe to Kafka with Spark 2.2

Hi all,
I am unable to write the contents of spark dataframe to Kafka.
I am using Spark 2.2

This is my code

val df = Seq(("1","One"),("2","two")).toDF("key","value")
df.printSchema()
df.show(false)
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "127.0.0.1:9092")
  .option("topic", "testtopic")
  .save()

and I am getting the following error message
[Stage 0:>                                                          (0 + 2)
/ 2]Exception in thread "main" org.apache.spark.SparkException: Job aborted
due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent
failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver):
java.lang.NoSuchMethodError:
org.apache.spark.sql.catalyst.expressions.Cast$.apply$default$3()Lscala/Option;
at
org.apache.spark.sql.kafka010.KafkaWriteTask.createProjection(KafkaWriteTask.scala:112)
at
org.apache.spark.sql.kafka010.KafkaWriteTask.<init>(KafkaWriteTask.scala:39)
at
org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(KafkaWriter.scala:90)
at
org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(KafkaWriter.scala:89)
at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

I have added this dependency

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql_2.11</artifactId>
  <version>2.2.2</version>
</dependency>

Appreciate any help. Thanks.
https://stackoverflow.com/questions/55229945/writing-the-contents-of-spark-dataframe-to-kafka-with-spark-2-2

Re: Writing the contents of spark dataframe to Kafka with Spark 2.2

Posted by Gabor Somogyi <ga...@gmail.com>.
Hi Anna,

Another classic problem source is the classpath. Check it and look for
version which is not 2.2.2...

BR,
G


On Wed, Mar 20, 2019 at 12:56 AM anna stax <an...@gmail.com> wrote:

> Hi Gabor,
>
> I am just trying out.  Desperately trying to make writing to kafka work
> using spark-sql-kafka.
> In my deployment project I do use the provided scope.
>
>
> On Tue, Mar 19, 2019 at 8:50 AM Gabor Somogyi <ga...@gmail.com>
> wrote:
>
>> Hi Anna,
>>
>> Looks like some sort of version mismatch.
>>
>> Presume scala version double checked...
>>
>> Not sure why the mentioned artifacts are not in provided scope.
>> It will end-up in significantly smaller jar + these artifacts should be
>> available on the cluster (either by default for example core or due to
>> --packages for example the sql-kafka).
>>
>> BR,
>> G
>>
>>
>> On Tue, Mar 19, 2019 at 4:35 PM anna stax <an...@gmail.com> wrote:
>>
>>> Hi Gabor,
>>>
>>> Thank you for the response.
>>>
>>> I do have those dependencies added.
>>>
>>>  <dependency>
>>>   <groupId>org.apache.spark</groupId>
>>>   <artifactId>spark-core_2.11</artifactId>
>>>   <version>2.2.2</version>
>>> </dependency>
>>> <dependency>
>>>   <groupId>org.apache.spark</groupId>
>>>   <artifactId>spark-sql_2.11</artifactId>
>>>   <version>2.2.2</version>
>>> </dependency>
>>> <dependency>
>>>   <groupId>org.apache.spark</groupId>
>>>   <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
>>>   <version>2.2.2</version>
>>> </dependency>
>>>
>>> and my kafka version is kafka_2.11-1.1.0
>>>
>>>
>>> On Tue, Mar 19, 2019 at 12:48 AM Gabor Somogyi <
>>> gabor.g.somogyi@gmail.com> wrote:
>>>
>>>> Hi Anna,
>>>>
>>>>   Have you added spark-sql-kafka-0-10_2.11:2.2.0 package as well?
>>>> Further info can be found here:
>>>> https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html#deploying
>>>> The same --packages option can be used with spark-shell as well...
>>>>
>>>> BR,
>>>> G
>>>>
>>>>
>>>> On Mon, Mar 18, 2019 at 10:07 PM anna stax <an...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi all,
>>>>> I am unable to write the contents of spark dataframe to Kafka.
>>>>> I am using Spark 2.2
>>>>>
>>>>> This is my code
>>>>>
>>>>> val df = Seq(("1","One"),("2","two")).toDF("key","value")
>>>>> df.printSchema()
>>>>> df.show(false)
>>>>> df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
>>>>>   .write
>>>>>   .format("kafka")
>>>>>   .option("kafka.bootstrap.servers", "127.0.0.1:9092")
>>>>>   .option("topic", "testtopic")
>>>>>   .save()
>>>>>
>>>>> and I am getting the following error message
>>>>> [Stage 0:>                                                          (0
>>>>> + 2) / 2]Exception in thread "main" org.apache.spark.SparkException: Job
>>>>> aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most
>>>>> recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor
>>>>> driver): java.lang.NoSuchMethodError:
>>>>> org.apache.spark.sql.catalyst.expressions.Cast$.apply$default$3()Lscala/Option;
>>>>> at
>>>>> org.apache.spark.sql.kafka010.KafkaWriteTask.createProjection(KafkaWriteTask.scala:112)
>>>>> at
>>>>> org.apache.spark.sql.kafka010.KafkaWriteTask.<init>(KafkaWriteTask.scala:39)
>>>>> at
>>>>> org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(KafkaWriter.scala:90)
>>>>> at
>>>>> org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(KafkaWriter.scala:89)
>>>>> at
>>>>> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
>>>>> at
>>>>> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
>>>>> at
>>>>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
>>>>> at
>>>>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
>>>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:99)
>>>>> at
>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
>>>>> at
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>> at
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>
>>>>> I have added this dependency
>>>>>
>>>>> <dependency>
>>>>>   <groupId>org.apache.spark</groupId>
>>>>>   <artifactId>spark-sql_2.11</artifactId>
>>>>>   <version>2.2.2</version>
>>>>> </dependency>
>>>>>
>>>>> Appreciate any help. Thanks.
>>>>>
>>>>> https://stackoverflow.com/questions/55229945/writing-the-contents-of-spark-dataframe-to-kafka-with-spark-2-2
>>>>>
>>>>

Re: Writing the contents of spark dataframe to Kafka with Spark 2.2

Posted by anna stax <an...@gmail.com>.
Hi Gabor,

I am just trying out.  Desperately trying to make writing to kafka work
using spark-sql-kafka.
In my deployment project I do use the provided scope.


On Tue, Mar 19, 2019 at 8:50 AM Gabor Somogyi <ga...@gmail.com>
wrote:

> Hi Anna,
>
> Looks like some sort of version mismatch.
>
> Presume scala version double checked...
>
> Not sure why the mentioned artifacts are not in provided scope.
> It will end-up in significantly smaller jar + these artifacts should be
> available on the cluster (either by default for example core or due to
> --packages for example the sql-kafka).
>
> BR,
> G
>
>
> On Tue, Mar 19, 2019 at 4:35 PM anna stax <an...@gmail.com> wrote:
>
>> Hi Gabor,
>>
>> Thank you for the response.
>>
>> I do have those dependencies added.
>>
>>  <dependency>
>>   <groupId>org.apache.spark</groupId>
>>   <artifactId>spark-core_2.11</artifactId>
>>   <version>2.2.2</version>
>> </dependency>
>> <dependency>
>>   <groupId>org.apache.spark</groupId>
>>   <artifactId>spark-sql_2.11</artifactId>
>>   <version>2.2.2</version>
>> </dependency>
>> <dependency>
>>   <groupId>org.apache.spark</groupId>
>>   <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
>>   <version>2.2.2</version>
>> </dependency>
>>
>> and my kafka version is kafka_2.11-1.1.0
>>
>>
>> On Tue, Mar 19, 2019 at 12:48 AM Gabor Somogyi <ga...@gmail.com>
>> wrote:
>>
>>> Hi Anna,
>>>
>>>   Have you added spark-sql-kafka-0-10_2.11:2.2.0 package as well?
>>> Further info can be found here:
>>> https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html#deploying
>>> The same --packages option can be used with spark-shell as well...
>>>
>>> BR,
>>> G
>>>
>>>
>>> On Mon, Mar 18, 2019 at 10:07 PM anna stax <an...@gmail.com> wrote:
>>>
>>>> Hi all,
>>>> I am unable to write the contents of spark dataframe to Kafka.
>>>> I am using Spark 2.2
>>>>
>>>> This is my code
>>>>
>>>> val df = Seq(("1","One"),("2","two")).toDF("key","value")
>>>> df.printSchema()
>>>> df.show(false)
>>>> df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
>>>>   .write
>>>>   .format("kafka")
>>>>   .option("kafka.bootstrap.servers", "127.0.0.1:9092")
>>>>   .option("topic", "testtopic")
>>>>   .save()
>>>>
>>>> and I am getting the following error message
>>>> [Stage 0:>                                                          (0
>>>> + 2) / 2]Exception in thread "main" org.apache.spark.SparkException: Job
>>>> aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most
>>>> recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor
>>>> driver): java.lang.NoSuchMethodError:
>>>> org.apache.spark.sql.catalyst.expressions.Cast$.apply$default$3()Lscala/Option;
>>>> at
>>>> org.apache.spark.sql.kafka010.KafkaWriteTask.createProjection(KafkaWriteTask.scala:112)
>>>> at
>>>> org.apache.spark.sql.kafka010.KafkaWriteTask.<init>(KafkaWriteTask.scala:39)
>>>> at
>>>> org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(KafkaWriter.scala:90)
>>>> at
>>>> org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(KafkaWriter.scala:89)
>>>> at
>>>> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
>>>> at
>>>> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
>>>> at
>>>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
>>>> at
>>>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
>>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>>>> at org.apache.spark.scheduler.Task.run(Task.scala:99)
>>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> I have added this dependency
>>>>
>>>> <dependency>
>>>>   <groupId>org.apache.spark</groupId>
>>>>   <artifactId>spark-sql_2.11</artifactId>
>>>>   <version>2.2.2</version>
>>>> </dependency>
>>>>
>>>> Appreciate any help. Thanks.
>>>>
>>>> https://stackoverflow.com/questions/55229945/writing-the-contents-of-spark-dataframe-to-kafka-with-spark-2-2
>>>>
>>>

Re: Writing the contents of spark dataframe to Kafka with Spark 2.2

Posted by Gabor Somogyi <ga...@gmail.com>.
Hi Anna,

Looks like some sort of version mismatch.

Presume scala version double checked...

Not sure why the mentioned artifacts are not in provided scope.
It will end-up in significantly smaller jar + these artifacts should be
available on the cluster (either by default for example core or due to
--packages for example the sql-kafka).

BR,
G


On Tue, Mar 19, 2019 at 4:35 PM anna stax <an...@gmail.com> wrote:

> Hi Gabor,
>
> Thank you for the response.
>
> I do have those dependencies added.
>
>  <dependency>
>   <groupId>org.apache.spark</groupId>
>   <artifactId>spark-core_2.11</artifactId>
>   <version>2.2.2</version>
> </dependency>
> <dependency>
>   <groupId>org.apache.spark</groupId>
>   <artifactId>spark-sql_2.11</artifactId>
>   <version>2.2.2</version>
> </dependency>
> <dependency>
>   <groupId>org.apache.spark</groupId>
>   <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
>   <version>2.2.2</version>
> </dependency>
>
> and my kafka version is kafka_2.11-1.1.0
>
>
> On Tue, Mar 19, 2019 at 12:48 AM Gabor Somogyi <ga...@gmail.com>
> wrote:
>
>> Hi Anna,
>>
>>   Have you added spark-sql-kafka-0-10_2.11:2.2.0 package as well?
>> Further info can be found here:
>> https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html#deploying
>> The same --packages option can be used with spark-shell as well...
>>
>> BR,
>> G
>>
>>
>> On Mon, Mar 18, 2019 at 10:07 PM anna stax <an...@gmail.com> wrote:
>>
>>> Hi all,
>>> I am unable to write the contents of spark dataframe to Kafka.
>>> I am using Spark 2.2
>>>
>>> This is my code
>>>
>>> val df = Seq(("1","One"),("2","two")).toDF("key","value")
>>> df.printSchema()
>>> df.show(false)
>>> df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
>>>   .write
>>>   .format("kafka")
>>>   .option("kafka.bootstrap.servers", "127.0.0.1:9092")
>>>   .option("topic", "testtopic")
>>>   .save()
>>>
>>> and I am getting the following error message
>>> [Stage 0:>                                                          (0 +
>>> 2) / 2]Exception in thread "main" org.apache.spark.SparkException: Job
>>> aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most
>>> recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor
>>> driver): java.lang.NoSuchMethodError:
>>> org.apache.spark.sql.catalyst.expressions.Cast$.apply$default$3()Lscala/Option;
>>> at
>>> org.apache.spark.sql.kafka010.KafkaWriteTask.createProjection(KafkaWriteTask.scala:112)
>>> at
>>> org.apache.spark.sql.kafka010.KafkaWriteTask.<init>(KafkaWriteTask.scala:39)
>>> at
>>> org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(KafkaWriter.scala:90)
>>> at
>>> org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(KafkaWriter.scala:89)
>>> at
>>> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
>>> at
>>> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
>>> at
>>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
>>> at
>>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:99)
>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>> I have added this dependency
>>>
>>> <dependency>
>>>   <groupId>org.apache.spark</groupId>
>>>   <artifactId>spark-sql_2.11</artifactId>
>>>   <version>2.2.2</version>
>>> </dependency>
>>>
>>> Appreciate any help. Thanks.
>>>
>>> https://stackoverflow.com/questions/55229945/writing-the-contents-of-spark-dataframe-to-kafka-with-spark-2-2
>>>
>>

Re: Writing the contents of spark dataframe to Kafka with Spark 2.2

Posted by anna stax <an...@gmail.com>.
Hi Gabor,

Thank you for the response.

I do have those dependencies added.

 <dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.11</artifactId>
  <version>2.2.2</version>
</dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql_2.11</artifactId>
  <version>2.2.2</version>
</dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
  <version>2.2.2</version>
</dependency>

and my kafka version is kafka_2.11-1.1.0


On Tue, Mar 19, 2019 at 12:48 AM Gabor Somogyi <ga...@gmail.com>
wrote:

> Hi Anna,
>
>   Have you added spark-sql-kafka-0-10_2.11:2.2.0 package as well?
> Further info can be found here:
> https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html#deploying
> The same --packages option can be used with spark-shell as well...
>
> BR,
> G
>
>
> On Mon, Mar 18, 2019 at 10:07 PM anna stax <an...@gmail.com> wrote:
>
>> Hi all,
>> I am unable to write the contents of spark dataframe to Kafka.
>> I am using Spark 2.2
>>
>> This is my code
>>
>> val df = Seq(("1","One"),("2","two")).toDF("key","value")
>> df.printSchema()
>> df.show(false)
>> df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
>>   .write
>>   .format("kafka")
>>   .option("kafka.bootstrap.servers", "127.0.0.1:9092")
>>   .option("topic", "testtopic")
>>   .save()
>>
>> and I am getting the following error message
>> [Stage 0:>                                                          (0 +
>> 2) / 2]Exception in thread "main" org.apache.spark.SparkException: Job
>> aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most
>> recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor
>> driver): java.lang.NoSuchMethodError:
>> org.apache.spark.sql.catalyst.expressions.Cast$.apply$default$3()Lscala/Option;
>> at
>> org.apache.spark.sql.kafka010.KafkaWriteTask.createProjection(KafkaWriteTask.scala:112)
>> at
>> org.apache.spark.sql.kafka010.KafkaWriteTask.<init>(KafkaWriteTask.scala:39)
>> at
>> org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(KafkaWriter.scala:90)
>> at
>> org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(KafkaWriter.scala:89)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
>> at
>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
>> at
>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>> at org.apache.spark.scheduler.Task.run(Task.scala:99)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> I have added this dependency
>>
>> <dependency>
>>   <groupId>org.apache.spark</groupId>
>>   <artifactId>spark-sql_2.11</artifactId>
>>   <version>2.2.2</version>
>> </dependency>
>>
>> Appreciate any help. Thanks.
>>
>> https://stackoverflow.com/questions/55229945/writing-the-contents-of-spark-dataframe-to-kafka-with-spark-2-2
>>
>

Re: Writing the contents of spark dataframe to Kafka with Spark 2.2

Posted by Gabor Somogyi <ga...@gmail.com>.
Hi Anna,

  Have you added spark-sql-kafka-0-10_2.11:2.2.0 package as well?
Further info can be found here:
https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html#deploying
The same --packages option can be used with spark-shell as well...

BR,
G


On Mon, Mar 18, 2019 at 10:07 PM anna stax <an...@gmail.com> wrote:

> Hi all,
> I am unable to write the contents of spark dataframe to Kafka.
> I am using Spark 2.2
>
> This is my code
>
> val df = Seq(("1","One"),("2","two")).toDF("key","value")
> df.printSchema()
> df.show(false)
> df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
>   .write
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "127.0.0.1:9092")
>   .option("topic", "testtopic")
>   .save()
>
> and I am getting the following error message
> [Stage 0:>                                                          (0 +
> 2) / 2]Exception in thread "main" org.apache.spark.SparkException: Job
> aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most
> recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor
> driver): java.lang.NoSuchMethodError:
> org.apache.spark.sql.catalyst.expressions.Cast$.apply$default$3()Lscala/Option;
> at
> org.apache.spark.sql.kafka010.KafkaWriteTask.createProjection(KafkaWriteTask.scala:112)
> at
> org.apache.spark.sql.kafka010.KafkaWriteTask.<init>(KafkaWriteTask.scala:39)
> at
> org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(KafkaWriter.scala:90)
> at
> org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(KafkaWriter.scala:89)
> at
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
> at
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> at org.apache.spark.scheduler.Task.run(Task.scala:99)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> I have added this dependency
>
> <dependency>
>   <groupId>org.apache.spark</groupId>
>   <artifactId>spark-sql_2.11</artifactId>
>   <version>2.2.2</version>
> </dependency>
>
> Appreciate any help. Thanks.
>
> https://stackoverflow.com/questions/55229945/writing-the-contents-of-spark-dataframe-to-kafka-with-spark-2-2
>