You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by satyajit vegesna <sa...@gmail.com> on 2017/12/11 02:28:58 UTC

Infer JSON schema in structured streaming Kafka.

Hi All,

I would like to infer JSON schema from a sample of data that i receive
from, Kafka Streams(specific topic), and i have to infer the schema as i am
going to receive random JSON string with different schema for each topic,
so i chose to go ahead with below steps,

a. readStream from Kafka(latest offset), from a single Kafka topic.
b. Some how to store the JSON string into val and infer the schema.
c. stop the stream.
d.Create new readStream(smallest offset) and use the above inferred schema
to process the JSON using spark provided JSON support, like from_json,
json_object and others and run my actuall business logic.

Now i am not sure how to be successful with step(b). Any help would be
appreciated.
And would also like to know if there is any better approach.

Regards,
Satyajit.

Re: Infer JSON schema in structured streaming Kafka.

Posted by satyajit vegesna <sa...@gmail.com>.
Hi Burak,

Thank you , for the inputs, would definitely try the options.

The reason we don't have an unified schema is because we are trying to
consume data from different topics that contains data from different tables
from a DB, and so each table has different columns.

Regards,
Satyajit.

On Dec 11, 2017 9:29 AM, "Burak Yavuz" <br...@gmail.com> wrote:

> In Spark 2.2, you can read from Kafka in batch mode, and then use the json
> reader to infer schema:
>
> val df = spark.read.format("kafka")...
>   .select($"value.cast("string"))
> val json = spark.read.json(df)
> val schema = json.schema
>
> While the above should be slow (since you're reading almost all data in
> Kafka in batch), but it would work.
>
> My question to you is, do you think it's worth it? Why do you have a
> random json schema being inputted to your Kafka stream? Can this randomness
> not mess up everything in the future if someone messes up? Not having
> fixed, known schemas with streaming data (or any data for that matter) is
> dangerous for most purposes.
> Just food for thought.
>
> Best,
> Burak
>
>
>
> On Mon, Dec 11, 2017 at 4:01 AM, Jacek Laskowski <ja...@japila.pl> wrote:
>
>> Hi,
>>
>> What about a custom streaming Sink that would stop the query after
>> addBatch has been called?
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> ----
>> https://about.me/JacekLaskowski
>> Spark Structured Streaming https://bit.ly/spark-structured-streaming
>> Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>>
>> On Mon, Dec 11, 2017 at 9:15 AM, satyajit vegesna <
>> satyajit.apasprk@gmail.com> wrote:
>>
>>> Hi Jacek,
>>>
>>> For now , i am using Thread.sleep() on driver, to make sure my streaming
>>> query receives some data and and stop it, before the control reaches
>>> querying memory table.
>>> Let me know if there is any better way of handling it.
>>>
>>> Regards,
>>> Satyajit.
>>>
>>> On Sun, Dec 10, 2017 at 10:43 PM, satyajit vegesna <
>>> satyajit.apasprk@gmail.com> wrote:
>>>
>>>> Hi Jacek,
>>>>
>>>> Thank you for responding back,
>>>>
>>>> i have tried memory sink, and below is what i did
>>>>
>>>>  val fetchValue = debeziumRecords.selectExpr("value").withColumn("tableName",
>>>> functions.get_json_object($"value".cast(StringType), "$.schema.name"))
>>>>     .withColumn("operation", functions.get_json_object($"value".cast(StringType),
>>>> "$.payload.op"))
>>>>     .withColumn("payloadAfterValue", split(substring_index(debeziumRecords("value"),
>>>> "\"after\":" ,-1),",\"source\"").getItem(0))
>>>>     .drop("tableName").drop("operation").drop("value").as[String].writeStream
>>>>
>>>>     .outputMode(OutputMode.Append())
>>>>     .queryName("record")
>>>>     .format("memory")
>>>>     .start()
>>>>
>>>> spark.sql("select * from record").show(truncate = false) //i was
>>>> expecting to be able to use the record table to read the JSON string, but
>>>> the table is empty for the first call. And i do not see any dataframe
>>>> output after the first one
>>>>
>>>> *But yeah the above steps work good and i can do things that i need to,
>>>> in spark-shell, the problem is when i try to code in Intellij, because the
>>>> streaming query keeps running and i am not sure how to identify and stop
>>>> the streaming query and use record memory table.*
>>>>
>>>> So i would like to stop the streaming query once i know i have some
>>>> data in my record memory table(is there a way to do that), so i can stop
>>>> the streaming query and use the memory table, fetch my record.
>>>> Any help on how to approach the situation programmatically/any examples
>>>> pointed would highly be appreciated.
>>>>
>>>> Regards,
>>>> Satyajit.
>>>>
>>>>
>>>>
>>>> On Sun, Dec 10, 2017 at 9:52 PM, Jacek Laskowski <ja...@japila.pl>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> What about memory sink? That could work.
>>>>>
>>>>> Pozdrawiam,
>>>>> Jacek Laskowski
>>>>> ----
>>>>> https://about.me/JacekLaskowski
>>>>> Spark Structured Streaming https://bit.ly/spark-structured-streaming
>>>>> Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
>>>>> Follow me at https://twitter.com/jaceklaskowski
>>>>>
>>>>> On Mon, Dec 11, 2017 at 3:28 AM, satyajit vegesna <
>>>>> satyajit.apasprk@gmail.com> wrote:
>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> I would like to infer JSON schema from a sample of data that i
>>>>>> receive from, Kafka Streams(specific topic), and i have to infer the schema
>>>>>> as i am going to receive random JSON string with different schema for each
>>>>>> topic, so i chose to go ahead with below steps,
>>>>>>
>>>>>> a. readStream from Kafka(latest offset), from a single Kafka topic.
>>>>>> b. Some how to store the JSON string into val and infer the schema.
>>>>>> c. stop the stream.
>>>>>> d.Create new readStream(smallest offset) and use the above inferred
>>>>>> schema to process the JSON using spark provided JSON support, like
>>>>>> from_json, json_object and others and run my actuall business logic.
>>>>>>
>>>>>> Now i am not sure how to be successful with step(b). Any help would
>>>>>> be appreciated.
>>>>>> And would also like to know if there is any better approach.
>>>>>>
>>>>>> Regards,
>>>>>> Satyajit.
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Infer JSON schema in structured streaming Kafka.

Posted by Burak Yavuz <br...@gmail.com>.
In Spark 2.2, you can read from Kafka in batch mode, and then use the json
reader to infer schema:

val df = spark.read.format("kafka")...
  .select($"value.cast("string"))
val json = spark.read.json(df)
val schema = json.schema

While the above should be slow (since you're reading almost all data in
Kafka in batch), but it would work.

My question to you is, do you think it's worth it? Why do you have a random
json schema being inputted to your Kafka stream? Can this randomness not
mess up everything in the future if someone messes up? Not having fixed,
known schemas with streaming data (or any data for that matter) is
dangerous for most purposes.
Just food for thought.

Best,
Burak



On Mon, Dec 11, 2017 at 4:01 AM, Jacek Laskowski <ja...@japila.pl> wrote:

> Hi,
>
> What about a custom streaming Sink that would stop the query after
> addBatch has been called?
>
> Pozdrawiam,
> Jacek Laskowski
> ----
> https://about.me/JacekLaskowski
> Spark Structured Streaming https://bit.ly/spark-structured-streaming
> Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
> On Mon, Dec 11, 2017 at 9:15 AM, satyajit vegesna <
> satyajit.apasprk@gmail.com> wrote:
>
>> Hi Jacek,
>>
>> For now , i am using Thread.sleep() on driver, to make sure my streaming
>> query receives some data and and stop it, before the control reaches
>> querying memory table.
>> Let me know if there is any better way of handling it.
>>
>> Regards,
>> Satyajit.
>>
>> On Sun, Dec 10, 2017 at 10:43 PM, satyajit vegesna <
>> satyajit.apasprk@gmail.com> wrote:
>>
>>> Hi Jacek,
>>>
>>> Thank you for responding back,
>>>
>>> i have tried memory sink, and below is what i did
>>>
>>>  val fetchValue = debeziumRecords.selectExpr("value").withColumn("tableName",
>>> functions.get_json_object($"value".cast(StringType), "$.schema.name"))
>>>     .withColumn("operation", functions.get_json_object($"value".cast(StringType),
>>> "$.payload.op"))
>>>     .withColumn("payloadAfterValue", split(substring_index(debeziumRecords("value"),
>>> "\"after\":" ,-1),",\"source\"").getItem(0))
>>>     .drop("tableName").drop("operation").drop("value").as[String].writeStream
>>>
>>>     .outputMode(OutputMode.Append())
>>>     .queryName("record")
>>>     .format("memory")
>>>     .start()
>>>
>>> spark.sql("select * from record").show(truncate = false) //i was
>>> expecting to be able to use the record table to read the JSON string, but
>>> the table is empty for the first call. And i do not see any dataframe
>>> output after the first one
>>>
>>> *But yeah the above steps work good and i can do things that i need to,
>>> in spark-shell, the problem is when i try to code in Intellij, because the
>>> streaming query keeps running and i am not sure how to identify and stop
>>> the streaming query and use record memory table.*
>>>
>>> So i would like to stop the streaming query once i know i have some data
>>> in my record memory table(is there a way to do that), so i can stop the
>>> streaming query and use the memory table, fetch my record.
>>> Any help on how to approach the situation programmatically/any examples
>>> pointed would highly be appreciated.
>>>
>>> Regards,
>>> Satyajit.
>>>
>>>
>>>
>>> On Sun, Dec 10, 2017 at 9:52 PM, Jacek Laskowski <ja...@japila.pl>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> What about memory sink? That could work.
>>>>
>>>> Pozdrawiam,
>>>> Jacek Laskowski
>>>> ----
>>>> https://about.me/JacekLaskowski
>>>> Spark Structured Streaming https://bit.ly/spark-structured-streaming
>>>> Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
>>>> Follow me at https://twitter.com/jaceklaskowski
>>>>
>>>> On Mon, Dec 11, 2017 at 3:28 AM, satyajit vegesna <
>>>> satyajit.apasprk@gmail.com> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> I would like to infer JSON schema from a sample of data that i receive
>>>>> from, Kafka Streams(specific topic), and i have to infer the schema as i am
>>>>> going to receive random JSON string with different schema for each topic,
>>>>> so i chose to go ahead with below steps,
>>>>>
>>>>> a. readStream from Kafka(latest offset), from a single Kafka topic.
>>>>> b. Some how to store the JSON string into val and infer the schema.
>>>>> c. stop the stream.
>>>>> d.Create new readStream(smallest offset) and use the above inferred
>>>>> schema to process the JSON using spark provided JSON support, like
>>>>> from_json, json_object and others and run my actuall business logic.
>>>>>
>>>>> Now i am not sure how to be successful with step(b). Any help would be
>>>>> appreciated.
>>>>> And would also like to know if there is any better approach.
>>>>>
>>>>> Regards,
>>>>> Satyajit.
>>>>>
>>>>
>>>>
>>>
>>
>

Re: Infer JSON schema in structured streaming Kafka.

Posted by Jacek Laskowski <ja...@japila.pl>.
Hi,

What about a custom streaming Sink that would stop the query after addBatch
has been called?

Pozdrawiam,
Jacek Laskowski
----
https://about.me/JacekLaskowski
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski

On Mon, Dec 11, 2017 at 9:15 AM, satyajit vegesna <
satyajit.apasprk@gmail.com> wrote:

> Hi Jacek,
>
> For now , i am using Thread.sleep() on driver, to make sure my streaming
> query receives some data and and stop it, before the control reaches
> querying memory table.
> Let me know if there is any better way of handling it.
>
> Regards,
> Satyajit.
>
> On Sun, Dec 10, 2017 at 10:43 PM, satyajit vegesna <
> satyajit.apasprk@gmail.com> wrote:
>
>> Hi Jacek,
>>
>> Thank you for responding back,
>>
>> i have tried memory sink, and below is what i did
>>
>>  val fetchValue = debeziumRecords.selectExpr("value").withColumn("tableName",
>> functions.get_json_object($"value".cast(StringType), "$.schema.name"))
>>     .withColumn("operation", functions.get_json_object($"value".cast(StringType),
>> "$.payload.op"))
>>     .withColumn("payloadAfterValue", split(substring_index(debeziumRecords("value"),
>> "\"after\":" ,-1),",\"source\"").getItem(0))
>>     .drop("tableName").drop("operation").drop("value").as[String].writeStream
>>
>>     .outputMode(OutputMode.Append())
>>     .queryName("record")
>>     .format("memory")
>>     .start()
>>
>> spark.sql("select * from record").show(truncate = false) //i was
>> expecting to be able to use the record table to read the JSON string, but
>> the table is empty for the first call. And i do not see any dataframe
>> output after the first one
>>
>> *But yeah the above steps work good and i can do things that i need to,
>> in spark-shell, the problem is when i try to code in Intellij, because the
>> streaming query keeps running and i am not sure how to identify and stop
>> the streaming query and use record memory table.*
>>
>> So i would like to stop the streaming query once i know i have some data
>> in my record memory table(is there a way to do that), so i can stop the
>> streaming query and use the memory table, fetch my record.
>> Any help on how to approach the situation programmatically/any examples
>> pointed would highly be appreciated.
>>
>> Regards,
>> Satyajit.
>>
>>
>>
>> On Sun, Dec 10, 2017 at 9:52 PM, Jacek Laskowski <ja...@japila.pl> wrote:
>>
>>> Hi,
>>>
>>> What about memory sink? That could work.
>>>
>>> Pozdrawiam,
>>> Jacek Laskowski
>>> ----
>>> https://about.me/JacekLaskowski
>>> Spark Structured Streaming https://bit.ly/spark-structured-streaming
>>> Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
>>> Follow me at https://twitter.com/jaceklaskowski
>>>
>>> On Mon, Dec 11, 2017 at 3:28 AM, satyajit vegesna <
>>> satyajit.apasprk@gmail.com> wrote:
>>>
>>>> Hi All,
>>>>
>>>> I would like to infer JSON schema from a sample of data that i receive
>>>> from, Kafka Streams(specific topic), and i have to infer the schema as i am
>>>> going to receive random JSON string with different schema for each topic,
>>>> so i chose to go ahead with below steps,
>>>>
>>>> a. readStream from Kafka(latest offset), from a single Kafka topic.
>>>> b. Some how to store the JSON string into val and infer the schema.
>>>> c. stop the stream.
>>>> d.Create new readStream(smallest offset) and use the above inferred
>>>> schema to process the JSON using spark provided JSON support, like
>>>> from_json, json_object and others and run my actuall business logic.
>>>>
>>>> Now i am not sure how to be successful with step(b). Any help would be
>>>> appreciated.
>>>> And would also like to know if there is any better approach.
>>>>
>>>> Regards,
>>>> Satyajit.
>>>>
>>>
>>>
>>
>

Re: Infer JSON schema in structured streaming Kafka.

Posted by satyajit vegesna <sa...@gmail.com>.
Hi Jacek,

For now , i am using Thread.sleep() on driver, to make sure my streaming
query receives some data and and stop it, before the control reaches
querying memory table.
Let me know if there is any better way of handling it.

Regards,
Satyajit.

On Sun, Dec 10, 2017 at 10:43 PM, satyajit vegesna <
satyajit.apasprk@gmail.com> wrote:

> Hi Jacek,
>
> Thank you for responding back,
>
> i have tried memory sink, and below is what i did
>
>  val fetchValue = debeziumRecords.selectExpr("
> value").withColumn("tableName", functions.get_json_object($"value".cast(StringType),
> "$.schema.name"))
>     .withColumn("operation", functions.get_json_object($"value".cast(StringType),
> "$.payload.op"))
>     .withColumn("payloadAfterValue", split(substring_index(debeziumRecords("value"),
> "\"after\":" ,-1),",\"source\"").getItem(0))
>     .drop("tableName").drop("operation").drop("value").as[String].writeStream
>
>     .outputMode(OutputMode.Append())
>     .queryName("record")
>     .format("memory")
>     .start()
>
> spark.sql("select * from record").show(truncate = false) //i was expecting
> to be able to use the record table to read the JSON string, but the table
> is empty for the first call. And i do not see any dataframe output after
> the first one
>
> *But yeah the above steps work good and i can do things that i need to, in
> spark-shell, the problem is when i try to code in Intellij, because the
> streaming query keeps running and i am not sure how to identify and stop
> the streaming query and use record memory table.*
>
> So i would like to stop the streaming query once i know i have some data
> in my record memory table(is there a way to do that), so i can stop the
> streaming query and use the memory table, fetch my record.
> Any help on how to approach the situation programmatically/any examples
> pointed would highly be appreciated.
>
> Regards,
> Satyajit.
>
>
>
> On Sun, Dec 10, 2017 at 9:52 PM, Jacek Laskowski <ja...@japila.pl> wrote:
>
>> Hi,
>>
>> What about memory sink? That could work.
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> ----
>> https://about.me/JacekLaskowski
>> Spark Structured Streaming https://bit.ly/spark-structured-streaming
>> Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>>
>> On Mon, Dec 11, 2017 at 3:28 AM, satyajit vegesna <
>> satyajit.apasprk@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> I would like to infer JSON schema from a sample of data that i receive
>>> from, Kafka Streams(specific topic), and i have to infer the schema as i am
>>> going to receive random JSON string with different schema for each topic,
>>> so i chose to go ahead with below steps,
>>>
>>> a. readStream from Kafka(latest offset), from a single Kafka topic.
>>> b. Some how to store the JSON string into val and infer the schema.
>>> c. stop the stream.
>>> d.Create new readStream(smallest offset) and use the above inferred
>>> schema to process the JSON using spark provided JSON support, like
>>> from_json, json_object and others and run my actuall business logic.
>>>
>>> Now i am not sure how to be successful with step(b). Any help would be
>>> appreciated.
>>> And would also like to know if there is any better approach.
>>>
>>> Regards,
>>> Satyajit.
>>>
>>
>>
>

Re: Infer JSON schema in structured streaming Kafka.

Posted by satyajit vegesna <sa...@gmail.com>.
Hi Jacek,

Thank you for responding back,

i have tried memory sink, and below is what i did

 val fetchValue =
debeziumRecords.selectExpr("value").withColumn("tableName",
functions.get_json_object($"value".cast(StringType), "$.schema.name"))
    .withColumn("operation",
functions.get_json_object($"value".cast(StringType), "$.payload.op"))
    .withColumn("payloadAfterValue",
split(substring_index(debeziumRecords("value"), "\"after\":"
,-1),",\"source\"").getItem(0))

.drop("tableName").drop("operation").drop("value").as[String].writeStream

    .outputMode(OutputMode.Append())
    .queryName("record")
    .format("memory")
    .start()

spark.sql("select * from record").show(truncate = false) //i was expecting
to be able to use the record table to read the JSON string, but the table
is empty for the first call. And i do not see any dataframe output after
the first one

*But yeah the above steps work good and i can do things that i need to, in
spark-shell, the problem is when i try to code in Intellij, because the
streaming query keeps running and i am not sure how to identify and stop
the streaming query and use record memory table.*

So i would like to stop the streaming query once i know i have some data in
my record memory table(is there a way to do that), so i can stop the
streaming query and use the memory table, fetch my record.
Any help on how to approach the situation programmatically/any examples
pointed would highly be appreciated.

Regards,
Satyajit.



On Sun, Dec 10, 2017 at 9:52 PM, Jacek Laskowski <ja...@japila.pl> wrote:

> Hi,
>
> What about memory sink? That could work.
>
> Pozdrawiam,
> Jacek Laskowski
> ----
> https://about.me/JacekLaskowski
> Spark Structured Streaming https://bit.ly/spark-structured-streaming
> Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
> On Mon, Dec 11, 2017 at 3:28 AM, satyajit vegesna <
> satyajit.apasprk@gmail.com> wrote:
>
>> Hi All,
>>
>> I would like to infer JSON schema from a sample of data that i receive
>> from, Kafka Streams(specific topic), and i have to infer the schema as i am
>> going to receive random JSON string with different schema for each topic,
>> so i chose to go ahead with below steps,
>>
>> a. readStream from Kafka(latest offset), from a single Kafka topic.
>> b. Some how to store the JSON string into val and infer the schema.
>> c. stop the stream.
>> d.Create new readStream(smallest offset) and use the above inferred
>> schema to process the JSON using spark provided JSON support, like
>> from_json, json_object and others and run my actuall business logic.
>>
>> Now i am not sure how to be successful with step(b). Any help would be
>> appreciated.
>> And would also like to know if there is any better approach.
>>
>> Regards,
>> Satyajit.
>>
>
>

Re: Infer JSON schema in structured streaming Kafka.

Posted by Jacek Laskowski <ja...@japila.pl>.
Hi,

What about memory sink? That could work.

Pozdrawiam,
Jacek Laskowski
----
https://about.me/JacekLaskowski
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski

On Mon, Dec 11, 2017 at 3:28 AM, satyajit vegesna <
satyajit.apasprk@gmail.com> wrote:

> Hi All,
>
> I would like to infer JSON schema from a sample of data that i receive
> from, Kafka Streams(specific topic), and i have to infer the schema as i am
> going to receive random JSON string with different schema for each topic,
> so i chose to go ahead with below steps,
>
> a. readStream from Kafka(latest offset), from a single Kafka topic.
> b. Some how to store the JSON string into val and infer the schema.
> c. stop the stream.
> d.Create new readStream(smallest offset) and use the above inferred schema
> to process the JSON using spark provided JSON support, like from_json,
> json_object and others and run my actuall business logic.
>
> Now i am not sure how to be successful with step(b). Any help would be
> appreciated.
> And would also like to know if there is any better approach.
>
> Regards,
> Satyajit.
>