You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by kaniska <ka...@gmail.com> on 2017/03/24 17:49:38 UTC

unable to stream kafka messages

Hi,

Currently , encountering the following exception while working with
below-mentioned code snippet :

> Please suggest the correct approach for reading the stream into a sql
> schema.
> If I add 'tweetSchema' while reading stream, it errors out with message -
> we can not change static schema for kafka.

-------------------------------------------------------------------------------------------

*exception*

Caused by: org.apache.spark.sql.AnalysisException: *cannot resolve
'`location`' given input columns: [topic, timestamp, key, offset, value,
timestampType, partition]*;
	at
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
	at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:77)
--------------------------------------------------------------------------------------------------------

*structured streaming code snippet*

String bootstrapServers = "localhost:9092";
	    String subscribeType = "subscribe";
	    String topics = "events";

            StructType tweetSchema = new StructType()
                .add("tweetId", "string")
                .add("tweetText", "string")
                .add("location", "string")
                .add("timestamp", "string");

           SparkSession spark = SparkSession
			      .builder()
			      .appName("StreamProcessor")
			      .config("spark.master", "local")
			      .getOrCreate();

          Dataset<Tweet> streams = spark
				      .readStream()
				      .format("kafka")
				      .option("kafka.bootstrap.servers", bootstrapServers)
				      .option(subscribeType, topics)
				      .load()
				      .as(Encoders.bean(Tweet.class));

         streams.createOrReplaceTempView("streamsData");
		   
		   String sql = "SELECT location,  COUNT(*) as count FROM streamsData
GROUP BY location";
		   Dataset<Row> countsByLocation = spark.sql(sql);

		    StreamingQuery query = countsByLocation.writeStream()
		      .outputMode("complete")
		      .format("console")
		      .start();

		    query.awaitTermination();
--------------------------------------------------------------------------------------------------

*Tweet *

Tweet.java - has public constructor and getter / setter methods

public class Tweet implements Serializable{
	
	private String tweetId;
	private String tweetText;
	private String location;
	private String timestamp;
	
	public Tweet(){
		
	}
.............  

----------------------------------------------------------------------------------------

*pom.xml *


		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-core_2.10</artifactId>
			<version>2.1.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-streaming_2.10</artifactId>
			<version>2.1.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-streaming-kafka-0-8_2.10</artifactId>
			<version>2.1.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-sql_2.10</artifactId>
			<version>2.1.0</version>
		</dependency>
		<dependency>
    		<groupId>org.apache.spark</groupId>
    		<artifactId>spark-sql-kafka-0-10_2.10</artifactId>
    		<version>2.1.0</version>
		</dependency>
------------------------------------------------------------------------------------



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/unable-to-stream-kafka-messages-tp28537.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: unable to stream kafka messages

Posted by Michael Armbrust <mi...@databricks.com>.
Ah, I understand what you are asking now.  There is no API for specifying a
kafka specific "decoder", since Spark SQL already has a rich language for
expressing transformations.  The dataframe code I gave will parse the JSON
and materialize in a class, very similar to what objectMapper.readValue(bytes,
Tweet.class) would do.

However, there are other cases where you might need to do some domain
specific transformation that Spark SQL doesn't support natively.  In this
case you can write a UDF that does the translation. There are a couple of
different ways you can specify this, depending on whether you want to
map/flatMap or just apply the function as a UDF to a single column
<http://stackoverflow.com/questions/35348058/how-do-i-call-a-udf-on-a-spark-dataframe-using-java>
.


On Mon, Mar 27, 2017 at 1:59 PM, kaniska Mandal <ka...@gmail.com>
wrote:

> yup, that solves the compilation issue :-)
>
> one quick question regarding specifying Decoder in kafka stream:
>
> please note that I am encoding the message as follows while sending data
> to kafka -
>
> <TweetEncoder>
>
> *String msg = objectMapper.writeValueAsString(tweetEvent);*
>
> *return msg.getBytes();*
>
> I have a corresponding <TweetDecoder>
>
> *return objectMapper.readValue(bytes, Tweet.class)*
>
>
> *>> how do I specify the Decoder in the following stream-processing flow ?*
> streams = spark
>   .readStream()
>   .format("kafka")
>   .option("kafka.bootstrap.servers", bootstrapServers)
>   .option(subscribeType, topics)
>   .load()
>   .withColumn("message", from_json(col("value").cast("string"),
> tweetSchema)) // cast the binary value to a string and parse it as json
>   .select("message.*") // unnest the json
>   .as(Encoders.bean(Tweet.class))
>
> Thanks
> Kaniska
>
> ---------------------------------------------
>
> On Mon, Mar 27, 2017 at 1:25 PM, Michael Armbrust <mi...@databricks.com>
> wrote:
>
>> You need to import col from org.apache.spark.sql.functions.
>>
>> On Mon, Mar 27, 2017 at 1:20 PM, kaniska Mandal <kaniska.mandal@gmail.com
>> > wrote:
>>
>>> Hi Michael,
>>>
>>> Can you please check if I am using correct version of spark-streaming
>>> library as specified in my pom (specified in the email) ?
>>>
>>> col("value").cast("string") - throwing an error 'cannot find symbol
>>> method col(java.lang.String)'
>>> I tried $"value" which results into similar compilation error.
>>>
>>> Thanks
>>> Kaniska
>>>
>>>
>>>
>>> On Mon, Mar 27, 2017 at 12:09 PM, Michael Armbrust <
>>> michael@databricks.com> wrote:
>>>
>>>> Sorry, I don't think that I understand the question.  Value is just a
>>>> binary blob that we get from kafka and pass to you.  If its stored in JSON,
>>>> I think the code I provided is a good option, but if you are using a
>>>> different encoding you may need to write a UDF.
>>>>
>>>> On Fri, Mar 24, 2017 at 4:58 PM, kaniska Mandal <
>>>> kaniska.mandal@gmail.com> wrote:
>>>>
>>>>> Hi Michael,
>>>>>
>>>>> Thanks much for the suggestion.
>>>>>
>>>>> I was wondering - whats the best way to deserialize the 'value' field
>>>>>
>>>>>
>>>>> On Fri, Mar 24, 2017 at 11:47 AM, Michael Armbrust <
>>>>> michael@databricks.com> wrote:
>>>>>
>>>>>> Encoders can only map data into an object if those columns already
>>>>>> exist.  When we are reading from Kafka, we just get a binary blob and
>>>>>> you'll need to help Spark parse that first.  Assuming your data is stored
>>>>>> in JSON it should be pretty straight forward.
>>>>>>
>>>>>> streams = spark
>>>>>>   .readStream()
>>>>>>   .format("kafka")
>>>>>>   .option("kafka.bootstrap.servers", bootstrapServers)
>>>>>>   .option(subscribeType, topics)
>>>>>>   .load()
>>>>>>   .withColumn("message", from_json(col("value").cast("string"),
>>>>>> tweetSchema)) // cast the binary value to a string and parse it as json
>>>>>>   .select("message.*") // unnest the json
>>>>>>   .as(Encoders.bean(Tweet.class)) // only required if you want to
>>>>>> use lambda functions on the data using this class
>>>>>>
>>>>>> Here is some more info on working with JSON and other
>>>>>> semi-structured formats
>>>>>> <https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html>
>>>>>> .
>>>>>>
>>>>>> On Fri, Mar 24, 2017 at 10:49 AM, kaniska <ka...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> Currently , encountering the following exception while working with
>>>>>>> below-mentioned code snippet :
>>>>>>>
>>>>>>> > Please suggest the correct approach for reading the stream into a
>>>>>>> sql
>>>>>>> > schema.
>>>>>>> > If I add 'tweetSchema' while reading stream, it errors out with
>>>>>>> message -
>>>>>>> > we can not change static schema for kafka.
>>>>>>>
>>>>>>> ------------------------------------------------------------
>>>>>>> -------------------------------
>>>>>>>
>>>>>>> *exception*
>>>>>>>
>>>>>>> Caused by: org.apache.spark.sql.AnalysisException: *cannot resolve
>>>>>>> '`location`' given input columns: [topic, timestamp, key, offset,
>>>>>>> value,
>>>>>>> timestampType, partition]*;
>>>>>>>         at
>>>>>>> org.apache.spark.sql.catalyst.analysis.package$AnalysisError
>>>>>>> At.failAnalysis(package.scala:42)
>>>>>>>         at
>>>>>>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfu
>>>>>>> n$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis
>>>>>>> .scala:77)
>>>>>>> ------------------------------------------------------------
>>>>>>> --------------------------------------------
>>>>>>>
>>>>>>> *structured streaming code snippet*
>>>>>>>
>>>>>>> String bootstrapServers = "localhost:9092";
>>>>>>>             String subscribeType = "subscribe";
>>>>>>>             String topics = "events";
>>>>>>>
>>>>>>>             StructType tweetSchema = new StructType()
>>>>>>>                 .add("tweetId", "string")
>>>>>>>                 .add("tweetText", "string")
>>>>>>>                 .add("location", "string")
>>>>>>>                 .add("timestamp", "string");
>>>>>>>
>>>>>>>            SparkSession spark = SparkSession
>>>>>>>                               .builder()
>>>>>>>                               .appName("StreamProcessor")
>>>>>>>                               .config("spark.master", "local")
>>>>>>>                               .getOrCreate();
>>>>>>>
>>>>>>>           Dataset<Tweet> streams = spark
>>>>>>>                                       .readStream()
>>>>>>>                                       .format("kafka")
>>>>>>>                                       .option("kafka.bootstrap.servers",
>>>>>>> bootstrapServers)
>>>>>>>                                       .option(subscribeType, topics)
>>>>>>>                                       .load()
>>>>>>>                                       .as(Encoders.bean(Tweet.class)
>>>>>>> );
>>>>>>>
>>>>>>>          streams.createOrReplaceTempView("streamsData");
>>>>>>>
>>>>>>>                    String sql = "SELECT location,  COUNT(*) as count
>>>>>>> FROM streamsData
>>>>>>> GROUP BY location";
>>>>>>>                    Dataset<Row> countsByLocation = spark.sql(sql);
>>>>>>>
>>>>>>>                     StreamingQuery query =
>>>>>>> countsByLocation.writeStream()
>>>>>>>                       .outputMode("complete")
>>>>>>>                       .format("console")
>>>>>>>                       .start();
>>>>>>>
>>>>>>>                     query.awaitTermination();
>>>>>>> ------------------------------------------------------------
>>>>>>> --------------------------------------
>>>>>>>
>>>>>>> *Tweet *
>>>>>>>
>>>>>>> Tweet.java - has public constructor and getter / setter methods
>>>>>>>
>>>>>>> public class Tweet implements Serializable{
>>>>>>>
>>>>>>>         private String tweetId;
>>>>>>>         private String tweetText;
>>>>>>>         private String location;
>>>>>>>         private String timestamp;
>>>>>>>
>>>>>>>         public Tweet(){
>>>>>>>
>>>>>>>         }
>>>>>>> .............
>>>>>>>
>>>>>>> ------------------------------------------------------------
>>>>>>> ----------------------------
>>>>>>>
>>>>>>> *pom.xml *
>>>>>>>
>>>>>>>
>>>>>>>                 <dependency>
>>>>>>>                         <groupId>org.apache.spark</groupId>
>>>>>>>                         <artifactId>spark-core_2.10</artifactId>
>>>>>>>                         <version>2.1.0</version>
>>>>>>>                 </dependency>
>>>>>>>                 <dependency>
>>>>>>>                         <groupId>org.apache.spark</groupId>
>>>>>>>                         <artifactId>spark-streaming_2.
>>>>>>> 10</artifactId>
>>>>>>>                         <version>2.1.0</version>
>>>>>>>                 </dependency>
>>>>>>>                 <dependency>
>>>>>>>                         <groupId>org.apache.spark</groupId>
>>>>>>>                         <artifactId>spark-streaming-ka
>>>>>>> fka-0-8_2.10</artifactId>
>>>>>>>                         <version>2.1.0</version>
>>>>>>>                 </dependency>
>>>>>>>                 <dependency>
>>>>>>>                         <groupId>org.apache.spark</groupId>
>>>>>>>                         <artifactId>spark-sql_2.10</artifactId>
>>>>>>>                         <version>2.1.0</version>
>>>>>>>                 </dependency>
>>>>>>>                 <dependency>
>>>>>>>                 <groupId>org.apache.spark</groupId>
>>>>>>>                 <artifactId>spark-sql-kafka-0-10_2.10</artifactId>
>>>>>>>                 <version>2.1.0</version>
>>>>>>>                 </dependency>
>>>>>>> ------------------------------------------------------------
>>>>>>> ------------------------
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> View this message in context: http://apache-spark-user-list.
>>>>>>> 1001560.n3.nabble.com/unable-to-stream-kafka-messages-tp28537.html
>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>> Nabble.com.
>>>>>>>
>>>>>>> ------------------------------------------------------------
>>>>>>> ---------
>>>>>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: unable to stream kafka messages

Posted by kaniska Mandal <ka...@gmail.com>.
yup, that solves the compilation issue :-)

one quick question regarding specifying Decoder in kafka stream:

please note that I am encoding the message as follows while sending data to
kafka -

<TweetEncoder>

*String msg = objectMapper.writeValueAsString(tweetEvent);*

*return msg.getBytes();*

I have a corresponding <TweetDecoder>

*return objectMapper.readValue(bytes, Tweet.class)*


*>> how do I specify the Decoder in the following stream-processing flow ?*
streams = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", bootstrapServers)
  .option(subscribeType, topics)
  .load()
  .withColumn("message", from_json(col("value").cast("string"),
tweetSchema)) // cast the binary value to a string and parse it as json
  .select("message.*") // unnest the json
  .as(Encoders.bean(Tweet.class))

Thanks
Kaniska

---------------------------------------------

On Mon, Mar 27, 2017 at 1:25 PM, Michael Armbrust <mi...@databricks.com>
wrote:

> You need to import col from org.apache.spark.sql.functions.
>
> On Mon, Mar 27, 2017 at 1:20 PM, kaniska Mandal <ka...@gmail.com>
> wrote:
>
>> Hi Michael,
>>
>> Can you please check if I am using correct version of spark-streaming
>> library as specified in my pom (specified in the email) ?
>>
>> col("value").cast("string") - throwing an error 'cannot find symbol
>> method col(java.lang.String)'
>> I tried $"value" which results into similar compilation error.
>>
>> Thanks
>> Kaniska
>>
>>
>>
>> On Mon, Mar 27, 2017 at 12:09 PM, Michael Armbrust <
>> michael@databricks.com> wrote:
>>
>>> Sorry, I don't think that I understand the question.  Value is just a
>>> binary blob that we get from kafka and pass to you.  If its stored in JSON,
>>> I think the code I provided is a good option, but if you are using a
>>> different encoding you may need to write a UDF.
>>>
>>> On Fri, Mar 24, 2017 at 4:58 PM, kaniska Mandal <
>>> kaniska.mandal@gmail.com> wrote:
>>>
>>>> Hi Michael,
>>>>
>>>> Thanks much for the suggestion.
>>>>
>>>> I was wondering - whats the best way to deserialize the 'value' field
>>>>
>>>>
>>>> On Fri, Mar 24, 2017 at 11:47 AM, Michael Armbrust <
>>>> michael@databricks.com> wrote:
>>>>
>>>>> Encoders can only map data into an object if those columns already
>>>>> exist.  When we are reading from Kafka, we just get a binary blob and
>>>>> you'll need to help Spark parse that first.  Assuming your data is stored
>>>>> in JSON it should be pretty straight forward.
>>>>>
>>>>> streams = spark
>>>>>   .readStream()
>>>>>   .format("kafka")
>>>>>   .option("kafka.bootstrap.servers", bootstrapServers)
>>>>>   .option(subscribeType, topics)
>>>>>   .load()
>>>>>   .withColumn("message", from_json(col("value").cast("string"),
>>>>> tweetSchema)) // cast the binary value to a string and parse it as json
>>>>>   .select("message.*") // unnest the json
>>>>>   .as(Encoders.bean(Tweet.class)) // only required if you want to use
>>>>> lambda functions on the data using this class
>>>>>
>>>>> Here is some more info on working with JSON and other semi-structured
>>>>> formats
>>>>> <https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html>
>>>>> .
>>>>>
>>>>> On Fri, Mar 24, 2017 at 10:49 AM, kaniska <ka...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Currently , encountering the following exception while working with
>>>>>> below-mentioned code snippet :
>>>>>>
>>>>>> > Please suggest the correct approach for reading the stream into a
>>>>>> sql
>>>>>> > schema.
>>>>>> > If I add 'tweetSchema' while reading stream, it errors out with
>>>>>> message -
>>>>>> > we can not change static schema for kafka.
>>>>>>
>>>>>> ------------------------------------------------------------
>>>>>> -------------------------------
>>>>>>
>>>>>> *exception*
>>>>>>
>>>>>> Caused by: org.apache.spark.sql.AnalysisException: *cannot resolve
>>>>>> '`location`' given input columns: [topic, timestamp, key, offset,
>>>>>> value,
>>>>>> timestampType, partition]*;
>>>>>>         at
>>>>>> org.apache.spark.sql.catalyst.analysis.package$AnalysisError
>>>>>> At.failAnalysis(package.scala:42)
>>>>>>         at
>>>>>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfu
>>>>>> n$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis
>>>>>> .scala:77)
>>>>>> ------------------------------------------------------------
>>>>>> --------------------------------------------
>>>>>>
>>>>>> *structured streaming code snippet*
>>>>>>
>>>>>> String bootstrapServers = "localhost:9092";
>>>>>>             String subscribeType = "subscribe";
>>>>>>             String topics = "events";
>>>>>>
>>>>>>             StructType tweetSchema = new StructType()
>>>>>>                 .add("tweetId", "string")
>>>>>>                 .add("tweetText", "string")
>>>>>>                 .add("location", "string")
>>>>>>                 .add("timestamp", "string");
>>>>>>
>>>>>>            SparkSession spark = SparkSession
>>>>>>                               .builder()
>>>>>>                               .appName("StreamProcessor")
>>>>>>                               .config("spark.master", "local")
>>>>>>                               .getOrCreate();
>>>>>>
>>>>>>           Dataset<Tweet> streams = spark
>>>>>>                                       .readStream()
>>>>>>                                       .format("kafka")
>>>>>>                                       .option("kafka.bootstrap.servers",
>>>>>> bootstrapServers)
>>>>>>                                       .option(subscribeType, topics)
>>>>>>                                       .load()
>>>>>>                                       .as(Encoders.bean(Tweet.class)
>>>>>> );
>>>>>>
>>>>>>          streams.createOrReplaceTempView("streamsData");
>>>>>>
>>>>>>                    String sql = "SELECT location,  COUNT(*) as count
>>>>>> FROM streamsData
>>>>>> GROUP BY location";
>>>>>>                    Dataset<Row> countsByLocation = spark.sql(sql);
>>>>>>
>>>>>>                     StreamingQuery query =
>>>>>> countsByLocation.writeStream()
>>>>>>                       .outputMode("complete")
>>>>>>                       .format("console")
>>>>>>                       .start();
>>>>>>
>>>>>>                     query.awaitTermination();
>>>>>> ------------------------------------------------------------
>>>>>> --------------------------------------
>>>>>>
>>>>>> *Tweet *
>>>>>>
>>>>>> Tweet.java - has public constructor and getter / setter methods
>>>>>>
>>>>>> public class Tweet implements Serializable{
>>>>>>
>>>>>>         private String tweetId;
>>>>>>         private String tweetText;
>>>>>>         private String location;
>>>>>>         private String timestamp;
>>>>>>
>>>>>>         public Tweet(){
>>>>>>
>>>>>>         }
>>>>>> .............
>>>>>>
>>>>>> ------------------------------------------------------------
>>>>>> ----------------------------
>>>>>>
>>>>>> *pom.xml *
>>>>>>
>>>>>>
>>>>>>                 <dependency>
>>>>>>                         <groupId>org.apache.spark</groupId>
>>>>>>                         <artifactId>spark-core_2.10</artifactId>
>>>>>>                         <version>2.1.0</version>
>>>>>>                 </dependency>
>>>>>>                 <dependency>
>>>>>>                         <groupId>org.apache.spark</groupId>
>>>>>>                         <artifactId>spark-streaming_2.10</artifactId>
>>>>>>                         <version>2.1.0</version>
>>>>>>                 </dependency>
>>>>>>                 <dependency>
>>>>>>                         <groupId>org.apache.spark</groupId>
>>>>>>                         <artifactId>spark-streaming-ka
>>>>>> fka-0-8_2.10</artifactId>
>>>>>>                         <version>2.1.0</version>
>>>>>>                 </dependency>
>>>>>>                 <dependency>
>>>>>>                         <groupId>org.apache.spark</groupId>
>>>>>>                         <artifactId>spark-sql_2.10</artifactId>
>>>>>>                         <version>2.1.0</version>
>>>>>>                 </dependency>
>>>>>>                 <dependency>
>>>>>>                 <groupId>org.apache.spark</groupId>
>>>>>>                 <artifactId>spark-sql-kafka-0-10_2.10</artifactId>
>>>>>>                 <version>2.1.0</version>
>>>>>>                 </dependency>
>>>>>> ------------------------------------------------------------
>>>>>> ------------------------
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> View this message in context: http://apache-spark-user-list.
>>>>>> 1001560.n3.nabble.com/unable-to-stream-kafka-messages-tp28537.html
>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>> Nabble.com.
>>>>>>
>>>>>> ---------------------------------------------------------------------
>>>>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: unable to stream kafka messages

Posted by Michael Armbrust <mi...@databricks.com>.
You need to import col from org.apache.spark.sql.functions.

On Mon, Mar 27, 2017 at 1:20 PM, kaniska Mandal <ka...@gmail.com>
wrote:

> Hi Michael,
>
> Can you please check if I am using correct version of spark-streaming
> library as specified in my pom (specified in the email) ?
>
> col("value").cast("string") - throwing an error 'cannot find symbol
> method col(java.lang.String)'
> I tried $"value" which results into similar compilation error.
>
> Thanks
> Kaniska
>
>
>
> On Mon, Mar 27, 2017 at 12:09 PM, Michael Armbrust <michael@databricks.com
> > wrote:
>
>> Sorry, I don't think that I understand the question.  Value is just a
>> binary blob that we get from kafka and pass to you.  If its stored in JSON,
>> I think the code I provided is a good option, but if you are using a
>> different encoding you may need to write a UDF.
>>
>> On Fri, Mar 24, 2017 at 4:58 PM, kaniska Mandal <kaniska.mandal@gmail.com
>> > wrote:
>>
>>> Hi Michael,
>>>
>>> Thanks much for the suggestion.
>>>
>>> I was wondering - whats the best way to deserialize the 'value' field
>>>
>>>
>>> On Fri, Mar 24, 2017 at 11:47 AM, Michael Armbrust <
>>> michael@databricks.com> wrote:
>>>
>>>> Encoders can only map data into an object if those columns already
>>>> exist.  When we are reading from Kafka, we just get a binary blob and
>>>> you'll need to help Spark parse that first.  Assuming your data is stored
>>>> in JSON it should be pretty straight forward.
>>>>
>>>> streams = spark
>>>>   .readStream()
>>>>   .format("kafka")
>>>>   .option("kafka.bootstrap.servers", bootstrapServers)
>>>>   .option(subscribeType, topics)
>>>>   .load()
>>>>   .withColumn("message", from_json(col("value").cast("string"),
>>>> tweetSchema)) // cast the binary value to a string and parse it as json
>>>>   .select("message.*") // unnest the json
>>>>   .as(Encoders.bean(Tweet.class)) // only required if you want to use
>>>> lambda functions on the data using this class
>>>>
>>>> Here is some more info on working with JSON and other semi-structured
>>>> formats
>>>> <https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html>
>>>> .
>>>>
>>>> On Fri, Mar 24, 2017 at 10:49 AM, kaniska <ka...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Currently , encountering the following exception while working with
>>>>> below-mentioned code snippet :
>>>>>
>>>>> > Please suggest the correct approach for reading the stream into a sql
>>>>> > schema.
>>>>> > If I add 'tweetSchema' while reading stream, it errors out with
>>>>> message -
>>>>> > we can not change static schema for kafka.
>>>>>
>>>>> ------------------------------------------------------------
>>>>> -------------------------------
>>>>>
>>>>> *exception*
>>>>>
>>>>> Caused by: org.apache.spark.sql.AnalysisException: *cannot resolve
>>>>> '`location`' given input columns: [topic, timestamp, key, offset,
>>>>> value,
>>>>> timestampType, partition]*;
>>>>>         at
>>>>> org.apache.spark.sql.catalyst.analysis.package$AnalysisError
>>>>> At.failAnalysis(package.scala:42)
>>>>>         at
>>>>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfu
>>>>> n$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:77)
>>>>> ------------------------------------------------------------
>>>>> --------------------------------------------
>>>>>
>>>>> *structured streaming code snippet*
>>>>>
>>>>> String bootstrapServers = "localhost:9092";
>>>>>             String subscribeType = "subscribe";
>>>>>             String topics = "events";
>>>>>
>>>>>             StructType tweetSchema = new StructType()
>>>>>                 .add("tweetId", "string")
>>>>>                 .add("tweetText", "string")
>>>>>                 .add("location", "string")
>>>>>                 .add("timestamp", "string");
>>>>>
>>>>>            SparkSession spark = SparkSession
>>>>>                               .builder()
>>>>>                               .appName("StreamProcessor")
>>>>>                               .config("spark.master", "local")
>>>>>                               .getOrCreate();
>>>>>
>>>>>           Dataset<Tweet> streams = spark
>>>>>                                       .readStream()
>>>>>                                       .format("kafka")
>>>>>                                       .option("kafka.bootstrap.servers",
>>>>> bootstrapServers)
>>>>>                                       .option(subscribeType, topics)
>>>>>                                       .load()
>>>>>                                       .as(Encoders.bean(Tweet.class));
>>>>>
>>>>>          streams.createOrReplaceTempView("streamsData");
>>>>>
>>>>>                    String sql = "SELECT location,  COUNT(*) as count
>>>>> FROM streamsData
>>>>> GROUP BY location";
>>>>>                    Dataset<Row> countsByLocation = spark.sql(sql);
>>>>>
>>>>>                     StreamingQuery query =
>>>>> countsByLocation.writeStream()
>>>>>                       .outputMode("complete")
>>>>>                       .format("console")
>>>>>                       .start();
>>>>>
>>>>>                     query.awaitTermination();
>>>>> ------------------------------------------------------------
>>>>> --------------------------------------
>>>>>
>>>>> *Tweet *
>>>>>
>>>>> Tweet.java - has public constructor and getter / setter methods
>>>>>
>>>>> public class Tweet implements Serializable{
>>>>>
>>>>>         private String tweetId;
>>>>>         private String tweetText;
>>>>>         private String location;
>>>>>         private String timestamp;
>>>>>
>>>>>         public Tweet(){
>>>>>
>>>>>         }
>>>>> .............
>>>>>
>>>>> ------------------------------------------------------------
>>>>> ----------------------------
>>>>>
>>>>> *pom.xml *
>>>>>
>>>>>
>>>>>                 <dependency>
>>>>>                         <groupId>org.apache.spark</groupId>
>>>>>                         <artifactId>spark-core_2.10</artifactId>
>>>>>                         <version>2.1.0</version>
>>>>>                 </dependency>
>>>>>                 <dependency>
>>>>>                         <groupId>org.apache.spark</groupId>
>>>>>                         <artifactId>spark-streaming_2.10</artifactId>
>>>>>                         <version>2.1.0</version>
>>>>>                 </dependency>
>>>>>                 <dependency>
>>>>>                         <groupId>org.apache.spark</groupId>
>>>>>                         <artifactId>spark-streaming-ka
>>>>> fka-0-8_2.10</artifactId>
>>>>>                         <version>2.1.0</version>
>>>>>                 </dependency>
>>>>>                 <dependency>
>>>>>                         <groupId>org.apache.spark</groupId>
>>>>>                         <artifactId>spark-sql_2.10</artifactId>
>>>>>                         <version>2.1.0</version>
>>>>>                 </dependency>
>>>>>                 <dependency>
>>>>>                 <groupId>org.apache.spark</groupId>
>>>>>                 <artifactId>spark-sql-kafka-0-10_2.10</artifactId>
>>>>>                 <version>2.1.0</version>
>>>>>                 </dependency>
>>>>> ------------------------------------------------------------
>>>>> ------------------------
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> View this message in context: http://apache-spark-user-list.
>>>>> 1001560.n3.nabble.com/unable-to-stream-kafka-messages-tp28537.html
>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>> Nabble.com.
>>>>>
>>>>> ---------------------------------------------------------------------
>>>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: unable to stream kafka messages

Posted by kaniska Mandal <ka...@gmail.com>.
Hi Michael,

Can you please check if I am using correct version of spark-streaming
library as specified in my pom (specified in the email) ?

col("value").cast("string") - throwing an error 'cannot find symbol method
col(java.lang.String)'
I tried $"value" which results into similar compilation error.

Thanks
Kaniska



On Mon, Mar 27, 2017 at 12:09 PM, Michael Armbrust <mi...@databricks.com>
wrote:

> Sorry, I don't think that I understand the question.  Value is just a
> binary blob that we get from kafka and pass to you.  If its stored in JSON,
> I think the code I provided is a good option, but if you are using a
> different encoding you may need to write a UDF.
>
> On Fri, Mar 24, 2017 at 4:58 PM, kaniska Mandal <ka...@gmail.com>
> wrote:
>
>> Hi Michael,
>>
>> Thanks much for the suggestion.
>>
>> I was wondering - whats the best way to deserialize the 'value' field
>>
>>
>> On Fri, Mar 24, 2017 at 11:47 AM, Michael Armbrust <
>> michael@databricks.com> wrote:
>>
>>> Encoders can only map data into an object if those columns already
>>> exist.  When we are reading from Kafka, we just get a binary blob and
>>> you'll need to help Spark parse that first.  Assuming your data is stored
>>> in JSON it should be pretty straight forward.
>>>
>>> streams = spark
>>>   .readStream()
>>>   .format("kafka")
>>>   .option("kafka.bootstrap.servers", bootstrapServers)
>>>   .option(subscribeType, topics)
>>>   .load()
>>>   .withColumn("message", from_json(col("value").cast("string"),
>>> tweetSchema)) // cast the binary value to a string and parse it as json
>>>   .select("message.*") // unnest the json
>>>   .as(Encoders.bean(Tweet.class)) // only required if you want to use
>>> lambda functions on the data using this class
>>>
>>> Here is some more info on working with JSON and other semi-structured
>>> formats
>>> <https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html>
>>> .
>>>
>>> On Fri, Mar 24, 2017 at 10:49 AM, kaniska <ka...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Currently , encountering the following exception while working with
>>>> below-mentioned code snippet :
>>>>
>>>> > Please suggest the correct approach for reading the stream into a sql
>>>> > schema.
>>>> > If I add 'tweetSchema' while reading stream, it errors out with
>>>> message -
>>>> > we can not change static schema for kafka.
>>>>
>>>> ------------------------------------------------------------
>>>> -------------------------------
>>>>
>>>> *exception*
>>>>
>>>> Caused by: org.apache.spark.sql.AnalysisException: *cannot resolve
>>>> '`location`' given input columns: [topic, timestamp, key, offset, value,
>>>> timestampType, partition]*;
>>>>         at
>>>> org.apache.spark.sql.catalyst.analysis.package$AnalysisError
>>>> At.failAnalysis(package.scala:42)
>>>>         at
>>>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfu
>>>> n$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:77)
>>>> ------------------------------------------------------------
>>>> --------------------------------------------
>>>>
>>>> *structured streaming code snippet*
>>>>
>>>> String bootstrapServers = "localhost:9092";
>>>>             String subscribeType = "subscribe";
>>>>             String topics = "events";
>>>>
>>>>             StructType tweetSchema = new StructType()
>>>>                 .add("tweetId", "string")
>>>>                 .add("tweetText", "string")
>>>>                 .add("location", "string")
>>>>                 .add("timestamp", "string");
>>>>
>>>>            SparkSession spark = SparkSession
>>>>                               .builder()
>>>>                               .appName("StreamProcessor")
>>>>                               .config("spark.master", "local")
>>>>                               .getOrCreate();
>>>>
>>>>           Dataset<Tweet> streams = spark
>>>>                                       .readStream()
>>>>                                       .format("kafka")
>>>>                                       .option("kafka.bootstrap.servers",
>>>> bootstrapServers)
>>>>                                       .option(subscribeType, topics)
>>>>                                       .load()
>>>>                                       .as(Encoders.bean(Tweet.class));
>>>>
>>>>          streams.createOrReplaceTempView("streamsData");
>>>>
>>>>                    String sql = "SELECT location,  COUNT(*) as count
>>>> FROM streamsData
>>>> GROUP BY location";
>>>>                    Dataset<Row> countsByLocation = spark.sql(sql);
>>>>
>>>>                     StreamingQuery query =
>>>> countsByLocation.writeStream()
>>>>                       .outputMode("complete")
>>>>                       .format("console")
>>>>                       .start();
>>>>
>>>>                     query.awaitTermination();
>>>> ------------------------------------------------------------
>>>> --------------------------------------
>>>>
>>>> *Tweet *
>>>>
>>>> Tweet.java - has public constructor and getter / setter methods
>>>>
>>>> public class Tweet implements Serializable{
>>>>
>>>>         private String tweetId;
>>>>         private String tweetText;
>>>>         private String location;
>>>>         private String timestamp;
>>>>
>>>>         public Tweet(){
>>>>
>>>>         }
>>>> .............
>>>>
>>>> ------------------------------------------------------------
>>>> ----------------------------
>>>>
>>>> *pom.xml *
>>>>
>>>>
>>>>                 <dependency>
>>>>                         <groupId>org.apache.spark</groupId>
>>>>                         <artifactId>spark-core_2.10</artifactId>
>>>>                         <version>2.1.0</version>
>>>>                 </dependency>
>>>>                 <dependency>
>>>>                         <groupId>org.apache.spark</groupId>
>>>>                         <artifactId>spark-streaming_2.10</artifactId>
>>>>                         <version>2.1.0</version>
>>>>                 </dependency>
>>>>                 <dependency>
>>>>                         <groupId>org.apache.spark</groupId>
>>>>                         <artifactId>spark-streaming-ka
>>>> fka-0-8_2.10</artifactId>
>>>>                         <version>2.1.0</version>
>>>>                 </dependency>
>>>>                 <dependency>
>>>>                         <groupId>org.apache.spark</groupId>
>>>>                         <artifactId>spark-sql_2.10</artifactId>
>>>>                         <version>2.1.0</version>
>>>>                 </dependency>
>>>>                 <dependency>
>>>>                 <groupId>org.apache.spark</groupId>
>>>>                 <artifactId>spark-sql-kafka-0-10_2.10</artifactId>
>>>>                 <version>2.1.0</version>
>>>>                 </dependency>
>>>> ------------------------------------------------------------
>>>> ------------------------
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context: http://apache-spark-user-list.
>>>> 1001560.n3.nabble.com/unable-to-stream-kafka-messages-tp28537.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>>
>>>>
>>>
>>
>

Re: unable to stream kafka messages

Posted by Michael Armbrust <mi...@databricks.com>.
Sorry, I don't think that I understand the question.  Value is just a
binary blob that we get from kafka and pass to you.  If its stored in JSON,
I think the code I provided is a good option, but if you are using a
different encoding you may need to write a UDF.

On Fri, Mar 24, 2017 at 4:58 PM, kaniska Mandal <ka...@gmail.com>
wrote:

> Hi Michael,
>
> Thanks much for the suggestion.
>
> I was wondering - whats the best way to deserialize the 'value' field
>
>
> On Fri, Mar 24, 2017 at 11:47 AM, Michael Armbrust <michael@databricks.com
> > wrote:
>
>> Encoders can only map data into an object if those columns already
>> exist.  When we are reading from Kafka, we just get a binary blob and
>> you'll need to help Spark parse that first.  Assuming your data is stored
>> in JSON it should be pretty straight forward.
>>
>> streams = spark
>>   .readStream()
>>   .format("kafka")
>>   .option("kafka.bootstrap.servers", bootstrapServers)
>>   .option(subscribeType, topics)
>>   .load()
>>   .withColumn("message", from_json(col("value").cast("string"),
>> tweetSchema)) // cast the binary value to a string and parse it as json
>>   .select("message.*") // unnest the json
>>   .as(Encoders.bean(Tweet.class)) // only required if you want to use
>> lambda functions on the data using this class
>>
>> Here is some more info on working with JSON and other semi-structured
>> formats
>> <https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html>
>> .
>>
>> On Fri, Mar 24, 2017 at 10:49 AM, kaniska <ka...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> Currently , encountering the following exception while working with
>>> below-mentioned code snippet :
>>>
>>> > Please suggest the correct approach for reading the stream into a sql
>>> > schema.
>>> > If I add 'tweetSchema' while reading stream, it errors out with
>>> message -
>>> > we can not change static schema for kafka.
>>>
>>> ------------------------------------------------------------
>>> -------------------------------
>>>
>>> *exception*
>>>
>>> Caused by: org.apache.spark.sql.AnalysisException: *cannot resolve
>>> '`location`' given input columns: [topic, timestamp, key, offset, value,
>>> timestampType, partition]*;
>>>         at
>>> org.apache.spark.sql.catalyst.analysis.package$AnalysisError
>>> At.failAnalysis(package.scala:42)
>>>         at
>>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfu
>>> n$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:77)
>>> ------------------------------------------------------------
>>> --------------------------------------------
>>>
>>> *structured streaming code snippet*
>>>
>>> String bootstrapServers = "localhost:9092";
>>>             String subscribeType = "subscribe";
>>>             String topics = "events";
>>>
>>>             StructType tweetSchema = new StructType()
>>>                 .add("tweetId", "string")
>>>                 .add("tweetText", "string")
>>>                 .add("location", "string")
>>>                 .add("timestamp", "string");
>>>
>>>            SparkSession spark = SparkSession
>>>                               .builder()
>>>                               .appName("StreamProcessor")
>>>                               .config("spark.master", "local")
>>>                               .getOrCreate();
>>>
>>>           Dataset<Tweet> streams = spark
>>>                                       .readStream()
>>>                                       .format("kafka")
>>>                                       .option("kafka.bootstrap.servers",
>>> bootstrapServers)
>>>                                       .option(subscribeType, topics)
>>>                                       .load()
>>>                                       .as(Encoders.bean(Tweet.class));
>>>
>>>          streams.createOrReplaceTempView("streamsData");
>>>
>>>                    String sql = "SELECT location,  COUNT(*) as count
>>> FROM streamsData
>>> GROUP BY location";
>>>                    Dataset<Row> countsByLocation = spark.sql(sql);
>>>
>>>                     StreamingQuery query = countsByLocation.writeStream()
>>>                       .outputMode("complete")
>>>                       .format("console")
>>>                       .start();
>>>
>>>                     query.awaitTermination();
>>> ------------------------------------------------------------
>>> --------------------------------------
>>>
>>> *Tweet *
>>>
>>> Tweet.java - has public constructor and getter / setter methods
>>>
>>> public class Tweet implements Serializable{
>>>
>>>         private String tweetId;
>>>         private String tweetText;
>>>         private String location;
>>>         private String timestamp;
>>>
>>>         public Tweet(){
>>>
>>>         }
>>> .............
>>>
>>> ------------------------------------------------------------
>>> ----------------------------
>>>
>>> *pom.xml *
>>>
>>>
>>>                 <dependency>
>>>                         <groupId>org.apache.spark</groupId>
>>>                         <artifactId>spark-core_2.10</artifactId>
>>>                         <version>2.1.0</version>
>>>                 </dependency>
>>>                 <dependency>
>>>                         <groupId>org.apache.spark</groupId>
>>>                         <artifactId>spark-streaming_2.10</artifactId>
>>>                         <version>2.1.0</version>
>>>                 </dependency>
>>>                 <dependency>
>>>                         <groupId>org.apache.spark</groupId>
>>>                         <artifactId>spark-streaming-ka
>>> fka-0-8_2.10</artifactId>
>>>                         <version>2.1.0</version>
>>>                 </dependency>
>>>                 <dependency>
>>>                         <groupId>org.apache.spark</groupId>
>>>                         <artifactId>spark-sql_2.10</artifactId>
>>>                         <version>2.1.0</version>
>>>                 </dependency>
>>>                 <dependency>
>>>                 <groupId>org.apache.spark</groupId>
>>>                 <artifactId>spark-sql-kafka-0-10_2.10</artifactId>
>>>                 <version>2.1.0</version>
>>>                 </dependency>
>>> ------------------------------------------------------------
>>> ------------------------
>>>
>>>
>>>
>>> --
>>> View this message in context: http://apache-spark-user-list.
>>> 1001560.n3.nabble.com/unable-to-stream-kafka-messages-tp28537.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>
>>>
>>
>

Re: unable to stream kafka messages

Posted by kaniska Mandal <ka...@gmail.com>.
Hi Michael,

Thanks much for the suggestion.

I was wondering - whats the best way to deserialize the 'value' field


On Fri, Mar 24, 2017 at 11:47 AM, Michael Armbrust <mi...@databricks.com>
wrote:

> Encoders can only map data into an object if those columns already exist.
> When we are reading from Kafka, we just get a binary blob and you'll need
> to help Spark parse that first.  Assuming your data is stored in JSON it
> should be pretty straight forward.
>
> streams = spark
>   .readStream()
>   .format("kafka")
>   .option("kafka.bootstrap.servers", bootstrapServers)
>   .option(subscribeType, topics)
>   .load()
>   .withColumn("message", from_json(col("value").cast("string"),
> tweetSchema)) // cast the binary value to a string and parse it as json
>   .select("message.*") // unnest the json
>   .as(Encoders.bean(Tweet.class)) // only required if you want to use
> lambda functions on the data using this class
>
> Here is some more info on working with JSON and other semi-structured
> formats
> <https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html>
> .
>
> On Fri, Mar 24, 2017 at 10:49 AM, kaniska <ka...@gmail.com>
> wrote:
>
>> Hi,
>>
>> Currently , encountering the following exception while working with
>> below-mentioned code snippet :
>>
>> > Please suggest the correct approach for reading the stream into a sql
>> > schema.
>> > If I add 'tweetSchema' while reading stream, it errors out with message
>> -
>> > we can not change static schema for kafka.
>>
>> ------------------------------------------------------------
>> -------------------------------
>>
>> *exception*
>>
>> Caused by: org.apache.spark.sql.AnalysisException: *cannot resolve
>> '`location`' given input columns: [topic, timestamp, key, offset, value,
>> timestampType, partition]*;
>>         at
>> org.apache.spark.sql.catalyst.analysis.package$AnalysisError
>> At.failAnalysis(package.scala:42)
>>         at
>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfu
>> n$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:77)
>> ------------------------------------------------------------
>> --------------------------------------------
>>
>> *structured streaming code snippet*
>>
>> String bootstrapServers = "localhost:9092";
>>             String subscribeType = "subscribe";
>>             String topics = "events";
>>
>>             StructType tweetSchema = new StructType()
>>                 .add("tweetId", "string")
>>                 .add("tweetText", "string")
>>                 .add("location", "string")
>>                 .add("timestamp", "string");
>>
>>            SparkSession spark = SparkSession
>>                               .builder()
>>                               .appName("StreamProcessor")
>>                               .config("spark.master", "local")
>>                               .getOrCreate();
>>
>>           Dataset<Tweet> streams = spark
>>                                       .readStream()
>>                                       .format("kafka")
>>                                       .option("kafka.bootstrap.servers",
>> bootstrapServers)
>>                                       .option(subscribeType, topics)
>>                                       .load()
>>                                       .as(Encoders.bean(Tweet.class));
>>
>>          streams.createOrReplaceTempView("streamsData");
>>
>>                    String sql = "SELECT location,  COUNT(*) as count FROM
>> streamsData
>> GROUP BY location";
>>                    Dataset<Row> countsByLocation = spark.sql(sql);
>>
>>                     StreamingQuery query = countsByLocation.writeStream()
>>                       .outputMode("complete")
>>                       .format("console")
>>                       .start();
>>
>>                     query.awaitTermination();
>> ------------------------------------------------------------
>> --------------------------------------
>>
>> *Tweet *
>>
>> Tweet.java - has public constructor and getter / setter methods
>>
>> public class Tweet implements Serializable{
>>
>>         private String tweetId;
>>         private String tweetText;
>>         private String location;
>>         private String timestamp;
>>
>>         public Tweet(){
>>
>>         }
>> .............
>>
>> ------------------------------------------------------------
>> ----------------------------
>>
>> *pom.xml *
>>
>>
>>                 <dependency>
>>                         <groupId>org.apache.spark</groupId>
>>                         <artifactId>spark-core_2.10</artifactId>
>>                         <version>2.1.0</version>
>>                 </dependency>
>>                 <dependency>
>>                         <groupId>org.apache.spark</groupId>
>>                         <artifactId>spark-streaming_2.10</artifactId>
>>                         <version>2.1.0</version>
>>                 </dependency>
>>                 <dependency>
>>                         <groupId>org.apache.spark</groupId>
>>                         <artifactId>spark-streaming-ka
>> fka-0-8_2.10</artifactId>
>>                         <version>2.1.0</version>
>>                 </dependency>
>>                 <dependency>
>>                         <groupId>org.apache.spark</groupId>
>>                         <artifactId>spark-sql_2.10</artifactId>
>>                         <version>2.1.0</version>
>>                 </dependency>
>>                 <dependency>
>>                 <groupId>org.apache.spark</groupId>
>>                 <artifactId>spark-sql-kafka-0-10_2.10</artifactId>
>>                 <version>2.1.0</version>
>>                 </dependency>
>> ------------------------------------------------------------
>> ------------------------
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/unable-to-stream-kafka-messages-tp28537.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>
>>
>

Re: unable to stream kafka messages

Posted by Michael Armbrust <mi...@databricks.com>.
Encoders can only map data into an object if those columns already exist.
When we are reading from Kafka, we just get a binary blob and you'll need
to help Spark parse that first.  Assuming your data is stored in JSON it
should be pretty straight forward.

streams = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", bootstrapServers)
  .option(subscribeType, topics)
  .load()
  .withColumn("message", from_json(col("value").cast("string"),
tweetSchema)) // cast the binary value to a string and parse it as json
  .select("message.*") // unnest the json
  .as(Encoders.bean(Tweet.class)) // only required if you want to use
lambda functions on the data using this class

Here is some more info on working with JSON and other semi-structured
formats
<https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html>
.

On Fri, Mar 24, 2017 at 10:49 AM, kaniska <ka...@gmail.com> wrote:

> Hi,
>
> Currently , encountering the following exception while working with
> below-mentioned code snippet :
>
> > Please suggest the correct approach for reading the stream into a sql
> > schema.
> > If I add 'tweetSchema' while reading stream, it errors out with message -
> > we can not change static schema for kafka.
>
> ------------------------------------------------------------
> -------------------------------
>
> *exception*
>
> Caused by: org.apache.spark.sql.AnalysisException: *cannot resolve
> '`location`' given input columns: [topic, timestamp, key, offset, value,
> timestampType, partition]*;
>         at
> org.apache.spark.sql.catalyst.analysis.package$
> AnalysisErrorAt.failAnalysis(package.scala:42)
>         at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$
> anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(
> CheckAnalysis.scala:77)
> ------------------------------------------------------------
> --------------------------------------------
>
> *structured streaming code snippet*
>
> String bootstrapServers = "localhost:9092";
>             String subscribeType = "subscribe";
>             String topics = "events";
>
>             StructType tweetSchema = new StructType()
>                 .add("tweetId", "string")
>                 .add("tweetText", "string")
>                 .add("location", "string")
>                 .add("timestamp", "string");
>
>            SparkSession spark = SparkSession
>                               .builder()
>                               .appName("StreamProcessor")
>                               .config("spark.master", "local")
>                               .getOrCreate();
>
>           Dataset<Tweet> streams = spark
>                                       .readStream()
>                                       .format("kafka")
>                                       .option("kafka.bootstrap.servers",
> bootstrapServers)
>                                       .option(subscribeType, topics)
>                                       .load()
>                                       .as(Encoders.bean(Tweet.class));
>
>          streams.createOrReplaceTempView("streamsData");
>
>                    String sql = "SELECT location,  COUNT(*) as count FROM
> streamsData
> GROUP BY location";
>                    Dataset<Row> countsByLocation = spark.sql(sql);
>
>                     StreamingQuery query = countsByLocation.writeStream()
>                       .outputMode("complete")
>                       .format("console")
>                       .start();
>
>                     query.awaitTermination();
> ------------------------------------------------------------
> --------------------------------------
>
> *Tweet *
>
> Tweet.java - has public constructor and getter / setter methods
>
> public class Tweet implements Serializable{
>
>         private String tweetId;
>         private String tweetText;
>         private String location;
>         private String timestamp;
>
>         public Tweet(){
>
>         }
> .............
>
> ------------------------------------------------------------
> ----------------------------
>
> *pom.xml *
>
>
>                 <dependency>
>                         <groupId>org.apache.spark</groupId>
>                         <artifactId>spark-core_2.10</artifactId>
>                         <version>2.1.0</version>
>                 </dependency>
>                 <dependency>
>                         <groupId>org.apache.spark</groupId>
>                         <artifactId>spark-streaming_2.10</artifactId>
>                         <version>2.1.0</version>
>                 </dependency>
>                 <dependency>
>                         <groupId>org.apache.spark</groupId>
>                         <artifactId>spark-streaming-
> kafka-0-8_2.10</artifactId>
>                         <version>2.1.0</version>
>                 </dependency>
>                 <dependency>
>                         <groupId>org.apache.spark</groupId>
>                         <artifactId>spark-sql_2.10</artifactId>
>                         <version>2.1.0</version>
>                 </dependency>
>                 <dependency>
>                 <groupId>org.apache.spark</groupId>
>                 <artifactId>spark-sql-kafka-0-10_2.10</artifactId>
>                 <version>2.1.0</version>
>                 </dependency>
> ------------------------------------------------------------
> ------------------------
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/unable-to-stream-kafka-messages-tp28537.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>
>

Re: unable to stream kafka messages

Posted by cbowden <cb...@gmail.com>.
The exception is telling you precisely what is wrong. The kafka source has a
schema of (topic, partition, offset, key, value, timestamp, timestampType).
Nothing about those columns makes sense as a tweet. You need to inform spark
how to get from bytes to tweet, it doesn't know how you serialized the
messages into kafka.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/unable-to-stream-kafka-messages-tp28537p29107.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org