You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Mich Talebzadeh <mi...@gmail.com> on 2021/05/02 09:45:42 UTC

Re: Delivery Status Notification (Failure)

This message is in two parts

Hi,

I did some tests on these. The idea being running  Spark Structured
Streaming (SSS) on a collection of records since the last run of SSS and
shutdown SSS job.

Some parts of this approach has been described in the following databricks
blog

Running Streaming Jobs Once a Day For 10x Cost Savings
<https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html>
However, real life is more complicated than that.

Let us look at a typical example as depicted in my lousy diagram attached

sources (trail files) --> Kafka --> Flume --> write to Cloud storage
(mounted locally) --> SSS --> BigQuery


What I have in here is a typical example of *trail files* produced by
source. This could be some CDC like Oracle Golden Gate sending
committed logs or anything that writes to files. We use Kafka to ingest
these files and we use Apache flume
<http://flume.apache.org/#:~:text=Welcome%20to%20Apache%20Flume%20%C2%B6%20Flume%20is%20a,reliability%20mechanisms%20and%20many%20failover%20and%20recovery%20mechanisms.>
to
move these files onto Google Cloud Storage (gs:// ) *mounted as a local
file system* in the edge node through Cloud Storage Fuse
<https://cloud.google.com/storage/docs/gcs-fuse>


The advantage of these storage types is that both on-premise and cloud
applications can take care of it.


For on-premise the mount point would be say data_path =
"file:///mnt/gs/prices/data/" where gs://etcbucket is mounted as /mnt/gs


For cloud reference it would be data_path = "etcbucket/prices/data/"


We use Flume's file_roll type for this storage


This is an event driven architecture and our interest is to process these
trail files through SSS at a predetermined interval or when needed.
However, the caveat is that the volume should be containable meaning SSS
can process it as required. The code base in PySpark is as follows:


      data_path = "etcbucket/prices/data/"

      checkpoint_path = "etcbucket/prices/chkpt/"

        try:

            streamingDataFrame = self.spark \

                .readStream \

                .option('newFilesOnly', 'true') \

                .option('header', 'true') \

                .option('maxFilesPerTrigger', 10000) \

                .option('latestFirst', 'false') \

                .text(data_path) \

                .select(from_json(col("value").cast("string"),
schema).alias("parsed_value"))


            streamingDataFrame.printSchema()

            result = streamingDataFrame.select( \

                     col("parsed_value.rowkey").alias("rowkey") \

                   , col("parsed_value.ticker").alias("ticker") \

                   , col("parsed_value.timeissued").alias("timeissued") \

                   , col("parsed_value.price").alias("price")). \

                     writeStream. \

                     outputMode('append'). \

                     option("truncate", "false"). \

                     *foreachBatch(sendToSink). \*

                     queryName('trailFiles'). \

   *                  trigger(once = True). \*

  *                   option('checkpointLocation', checkpoint_path). \*

                     start(data_path)

        except Exception as e:

                print(f"""{e}, quitting""")

                sys.exit(1)

def sendToSink(df, batchId):
    if(len(df.take(1))) > 0:
        print(f"""batchId is {batchId}""")
        df.show(100,False)
        df. persist()
        # write to BigQuery batch table
        s.writeTableToBQ(df, "append",
config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])
        df.unpersist()
        print(f"""wrote to DB""")
    else:
        print("DataFrame is empty")



   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sun, 2 May 2021 at 10:42, Mail Delivery Subsystem <
mailer-daemon@googlemail.com> wrote:

> [image: Error Icon]
> Message too large
> Your message couldn't be delivered to *user@spark.apache.org* because it
> exceeds the size limit. Try reducing the message size and then resending
> it.
> The response from the remote server was:
>
> 552 5.3.4 Message size exceeds fixed limit
>
>
>
> ---------- Forwarded message ----------
> From: Mich Talebzadeh <mi...@gmail.com>
> To:
> Cc: user <us...@spark.apache.org>
> Bcc:
> Date: Sun, 2 May 2021 10:42:09 +0100
> Subject: Re: Spark Streaming with Files
> ----- Message truncated -----

Re: Delivery Status Notification (Failure)

Posted by Mich Talebzadeh <mi...@gmail.com>.
Part 2

In this case, we are simply counting the number of rows to be ingested once
before SSS terminates. This is shown in the above method


batchId is 0

 Total records processed in this run = 3107

wrote to DB

So it shows batchId (0) and the total records count() and writes to
BigQuery table and terminates

wait and start again, it should pickup from the next batchId

batchId is 1
 Total records processed in this run = 80
wrote to DB
hduser@rhes76: /home/hduser/dba/bin/python/DSBQ/src>

*What checkpoint directory has*

 /mnt/gs/prices/chkpt> ltr
total 1
-rw-r--r--. 1 hduser hadoop 45 May  2 09:35 metadata
drwxr-xr-x. 1 hduser hadoop  0 May  2 09:35 offsets
drwxr-xr-x. 1 hduser hadoop  0 May  2 09:35 commits
drwxr-xr-x. 1 hduser hadoop  0 May  2 09:35 sources

cat metadata
{"id":"cc3a9459-2a9d-4740-a280-e5b5d333d098"}

cd offsets/
/mnt/gs/prices/chkpt/offsets> ltr
total 2
-rw-r--r--. 1 hduser hadoop 529 May  2 09:35 0
-rw-r--r--. 1 hduser hadoop 529 May  2 09:39 1

> cat 0
v1
{"batchWatermarkMs":0,"batchTimestampMs":1619944526698,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"logOffset":0}

cat 1
v1
{"batchWatermarkMs":0,"batchTimestampMs":1619944796208,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.join.stateFormatVersion":"2","spark.sql.streaming.stateStore.compression.codec":"lz4","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"logOffset":1}


So there are two offsets for each run (0,1) with different
"batchTimestampMs", namely "batchTimestampMs":1619944526698 and
 "batchTimestampMs":1619944796208 respectively


*How to trigger SSS for each run*


The SSS job can be triggered in many ways. Can use simple cron on prem,
autoSys, Airflow on prem or composer in cloud.


If there is nothing in the queue (source stopped say), SSS will come back
with


DataFrame is empty


and terminates. This logic is already built in the sendToSink() method.


*Conclusion*


I am not sure not running compute with SSS is going to save a lot. Surely
compute process will be run as needed and that saves some dollars but the
whole infra-structure has to be there and the lion cost goes there. If the
idea is to run CDC once or twice a day, then it is equally fine to schedule
SSS to start at certain intervals. The import thing to realise is that SSS
will pickup from the records left through the checkpoint directory. If
checkpoint directory is lost or content deleted, SSS will process all the
records from batchId 0.


HTH,


Mich



   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sun, 2 May 2021 at 10:45, Mich Talebzadeh <mi...@gmail.com>
wrote:

> This message is in two parts
>
> Hi,
>
> I did some tests on these. The idea being running  Spark Structured
> Streaming (SSS) on a collection of records since the last run of SSS and
> shutdown SSS job.
>
> Some parts of this approach has been described in the following databricks
> blog
>
> Running Streaming Jobs Once a Day For 10x Cost Savings
> <https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html>
> However, real life is more complicated than that.
>
> Let us look at a typical example as depicted in my lousy diagram attached
>
> sources (trail files) --> Kafka --> Flume --> write to Cloud storage
> (mounted locally) --> SSS --> BigQuery
>
>
> What I have in here is a typical example of *trail files* produced by
> source. This could be some CDC like Oracle Golden Gate sending
> committed logs or anything that writes to files. We use Kafka to ingest
> these files and we use Apache flume
> <http://flume.apache.org/#:~:text=Welcome%20to%20Apache%20Flume%20%C2%B6%20Flume%20is%20a,reliability%20mechanisms%20and%20many%20failover%20and%20recovery%20mechanisms.> to
> move these files onto Google Cloud Storage (gs:// ) *mounted as a local
> file system* in the edge node through Cloud Storage Fuse
> <https://cloud.google.com/storage/docs/gcs-fuse>
>
>
> The advantage of these storage types is that both on-premise and cloud
> applications can take care of it.
>
>
> For on-premise the mount point would be say data_path =
> "file:///mnt/gs/prices/data/" where gs://etcbucket is mounted as /mnt/gs
>
>
> For cloud reference it would be data_path = "etcbucket/prices/data/"
>
>
> We use Flume's file_roll type for this storage
>
>
> This is an event driven architecture and our interest is to process these
> trail files through SSS at a predetermined interval or when needed.
> However, the caveat is that the volume should be containable meaning SSS
> can process it as required. The code base in PySpark is as follows:
>
>
>       data_path = "etcbucket/prices/data/"
>
>       checkpoint_path = "etcbucket/prices/chkpt/"
>
>         try:
>
>             streamingDataFrame = self.spark \
>
>                 .readStream \
>
>                 .option('newFilesOnly', 'true') \
>
>                 .option('header', 'true') \
>
>                 .option('maxFilesPerTrigger', 10000) \
>
>                 .option('latestFirst', 'false') \
>
>                 .text(data_path) \
>
>                 .select(from_json(col("value").cast("string"),
> schema).alias("parsed_value"))
>
>
>             streamingDataFrame.printSchema()
>
>             result = streamingDataFrame.select( \
>
>                      col("parsed_value.rowkey").alias("rowkey") \
>
>                    , col("parsed_value.ticker").alias("ticker") \
>
>                    , col("parsed_value.timeissued").alias("timeissued") \
>
>                    , col("parsed_value.price").alias("price")). \
>
>                      writeStream. \
>
>                      outputMode('append'). \
>
>                      option("truncate", "false"). \
>
>                      *foreachBatch(sendToSink). \*
>
>                      queryName('trailFiles'). \
>
>    *                  trigger(once = True). \*
>
>   *                   option('checkpointLocation', checkpoint_path). \*
>
>                      start(data_path)
>
>         except Exception as e:
>
>                 print(f"""{e}, quitting""")
>
>                 sys.exit(1)
>
> def sendToSink(df, batchId):
>     if(len(df.take(1))) > 0:
>         print(f"""batchId is {batchId}""")
>         df.show(100,False)
>         df. persist()
>         # write to BigQuery batch table
>         s.writeTableToBQ(df, "append",
> config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])
>         df.unpersist()
>         print(f"""wrote to DB""")
>     else:
>         print("DataFrame is empty")
>
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sun, 2 May 2021 at 10:42, Mail Delivery Subsystem <
> mailer-daemon@googlemail.com> wrote:
>
>> [image: Error Icon]
>> Message too large
>> Your message couldn't be delivered to *user@spark.apache.org* because it
>> exceeds the size limit. Try reducing the message size and then resending
>> it.
>> The response from the remote server was:
>>
>> 552 5.3.4 Message size exceeds fixed limit
>>
>>
>>
>> ---------- Forwarded message ----------
>> From: Mich Talebzadeh <mi...@gmail.com>
>> To:
>> Cc: user <us...@spark.apache.org>
>> Bcc:
>> Date: Sun, 2 May 2021 10:42:09 +0100
>> Subject: Re: Spark Streaming with Files
>> ----- Message truncated -----
>
>