You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nifi.apache.org by Martin Ebert <ma...@gmx.de> on 2020/02/01 10:25:37 UTC

Processor for Delta Lake / Real Time Data Warehouse

Hi community,
how can we drive this topic forward? Jira is created:
https://issues.apache.org/jira/projects/NIFI/issues/NIFI-6976

"A table in Delta Lake is both a batch table, as well as a streaming source
and sink. Streaming data ingest, batch historic backfill, and interactive
queries all just work out of the box." (Delta.io)

This is the decisive argument for me. A very impressive technological
milestone that is just crying out to be implemented in Nifi. You find all
details in the video here: https://youtu.be/VLd_qOrKrTI

Delta Lake is related to Databricks, Athena and Presto. In our case it
would be great to extract data from a database or any other source (can be
streaming) and send this data or stream to our Databricks cluster.

I imagine it just like in the video. You have a Delta Lake processor, where
you can define to which Databricks cluster the data should go and which
Delta Lake operation (upsert, merge, delete, ...) should happen with the
data. That means Databricks is only the executing component and I don't
have to code in Databricks in notebooks anymore. I also find the
possibility to request an extra cluster with the processor cool.

Being able to to the same with Athena and Presto would be a dream!

Re: Processor for Delta Lake / Real Time Data Warehouse

Posted by Mike Thomsen <mi...@gmail.com>.
You might want to look at this as a starting point instead:
https://github.com/delta-io/connectors

I've been involved in some unofficial talks about building such a processor
for a little while, but the opportunity hasn't quite lined up yet because
Delta is still maturing on creating a public API for non-Spark users to use.

In the short term, nothing is stopping you from integrating NiFi and Spark
here to get a similar thing going. Just build Parquet files and stream them
to a place where Spark + Delta can read them and integrate them into your
Delta Lake.

On Sat, Feb 1, 2020 at 4:14 PM Joey Frazee <jo...@icloud.com.invalid>
wrote:

> Martin, I’ve been thinking about this one for a while but I think it needs
> to be considered in the context of transactional table formats in general;
> i.e., the incubating Apache Hudi and Apache Iceberg too.
>
> There are things that are inconvenient for NiFi to do with these table
> formats.
>
> But the specific use case you described can be done today using Nifi’s out
> of the box SQL processors and JDBC/ODBC with the all of mentioned
> databases/engines.
>
> -joey
> On Feb 1, 2020, 6:57 AM -0600, Martin Ebert <ma...@gmx.de>, wrote:
> > Hi community,
> > how can we drive this topic forward? Jira is created:
> > https://issues.apache.org/jira/projects/NIFI/issues/NIFI-6976
> >
> > "A table in Delta Lake is both a batch table, as well as a streaming
> source
> > and sink. Streaming data ingest, batch historic backfill, and interactive
> > queries all just work out of the box." (Delta.io)
> >
> > This is the decisive argument for me. A very impressive technological
> > milestone that is just crying out to be implemented in Nifi. You find all
> > details in the video here: https://youtu.be/VLd_qOrKrTI
> >
> > Delta Lake is related to Databricks, Athena and Presto. In our case it
> > would be great to extract data from a database or any other source (can
> be
> > streaming) and send this data or stream to our Databricks cluster.
> >
> > I imagine it just like in the video. You have a Delta Lake processor,
> where
> > you can define to which Databricks cluster the data should go and which
> > Delta Lake operation (upsert, merge, delete, ...) should happen with the
> > data. That means Databricks is only the executing component and I don't
> > have to code in Databricks in notebooks anymore. I also find the
> > possibility to request an extra cluster with the processor cool.
> >
> > Being able to to the same with Athena and Presto would be a dream!
>

Re: Processor for Delta Lake / Real Time Data Warehouse

Posted by Mike Thomsen <mi...@gmail.com>.
For the NiFi side, I've set things up locally using a Parquet setting to
write the files locally to disk and then had standalone Spark read those
into a Delta Lake. It was very easy to do using basic info from public
tutorials on Parquet, Delta, SparkSQL, etc. I don't have any code to share
right now, but this is a rough sketch for NiFi:

1. Write an Avro schema for a simple data set.
2. Generate some sample data.
3. Load the data and put it through the Parquet processors w/ a hadoop
settings file that lets them write to local disk or S3.
4. Have a Spark process in place that will read the input Parquet folder
and merge it into the Delta table according to your preferences.

Beyond that, I think you'll need to go over to the Delta mailing
list/Google Group and ask them about best practices like going from bronze
-> silver -> gold.

On Sun, Feb 2, 2020 at 5:03 AM Martin Ebert <ma...@gmx.de> wrote:

> Can anyone share a concrete minimal example with Nifi?
>
> Scenario:
> I have two tables in parquet format in S3. This is my source.
> Step #1 Bronze Layer / Raw
> Now I would like to consume these tables in step 1 (ListS3 + FetchS3) and
> save each as a delta table in S3 (new delta processor). I use for example
> overwrite for each execution. I expect that something like partitions and
> the transaction logs are created correctly.
> Step 2 Silver Layer / Prep
> Now I read the two delta tables (new delta processor) from the previous
> step as batch or stream, filter my data (configurable in the delta
> processor) and save the new delta tables in my Silver Layer.
> Step 3 Gold Layer / Analyze
> Now I read the two delta tables from the previous step as batch or stream
> and "connect" my data (join, merge, delete, ...). The results will end up
> as delta in S3 or as single parquet file (depends on the next steps).
> This is how I see the current process.
>
> There are two things that are not quite clear to me:
> 1) How exactly this looks like: "Delta tables can be registered in the
> metastore and you can enable Delta SQL commands with Spark".
> 2) How do I run the SQL scripts on Databricks? Simply by using the JDBC
> connection to Databricks?
>
> Joey Frazee <jo...@icloud.com.invalid> schrieb am So., 2. Feb. 2020,
> 02:47:
>
> > Delta tables can be registered in the metastore and you can enable Delta
> > SQL commands with Spark.
> >
> > And there are integrations with other engines for the read path.
> >
> > This isn’t to say a native integration isn’t needed; it is. But there are
> > some options today.
> >
> > -joey
> > On Feb 1, 2020, 7:13 PM -0600, Mike Thomsen <mi...@gmail.com>,
> > wrote:
> > > > But the specific use case you described can be done today using
> Nifi’s
> > > out of the box SQL processors and JDBC/ODBC with the all of mentioned
> > > databases/engines.
> > >
> > > I'm not aware of any direct JDBC route that would allow NiFi to use the
> > SQL
> > > processors to hit Delta Lake. Would like to be proved wrong here (heck,
> > it
> > > would make some of our use cases easier!) but AFAIK you have to have
> NiFi
> > > build Parquet and push to S3 or HDFS so Spark can do the
> transformation.
> > >
> > > On Sat, Feb 1, 2020 at 4:14 PM Joey Frazee <joey.frazee@icloud.com
> > .invalid>
> > > wrote:
> > >
> > > > Martin, I’ve been thinking about this one for a while but I think it
> > needs
> > > > to be considered in the context of transactional table formats in
> > general;
> > > > i.e., the incubating Apache Hudi and Apache Iceberg too.
> > > >
> > > > There are things that are inconvenient for NiFi to do with these
> table
> > > > formats.
> > > >
> > > > But the specific use case you described can be done today using
> Nifi’s
> > out
> > > > of the box SQL processors and JDBC/ODBC with the all of mentioned
> > > > databases/engines.
> > > >
> > > > -joey
> > > > On Feb 1, 2020, 6:57 AM -0600, Martin Ebert <ma...@gmx.de>,
> > wrote:
> > > > > Hi community,
> > > > > how can we drive this topic forward? Jira is created:
> > > > > https://issues.apache.org/jira/projects/NIFI/issues/NIFI-6976
> > > > >
> > > > > "A table in Delta Lake is both a batch table, as well as a
> streaming
> > > > source
> > > > > and sink. Streaming data ingest, batch historic backfill, and
> > interactive
> > > > > queries all just work out of the box." (Delta.io)
> > > > >
> > > > > This is the decisive argument for me. A very impressive
> technological
> > > > > milestone that is just crying out to be implemented in Nifi. You
> > find all
> > > > > details in the video here: https://youtu.be/VLd_qOrKrTI
> > > > >
> > > > > Delta Lake is related to Databricks, Athena and Presto. In our case
> > it
> > > > > would be great to extract data from a database or any other source
> > (can
> > > > be
> > > > > streaming) and send this data or stream to our Databricks cluster.
> > > > >
> > > > > I imagine it just like in the video. You have a Delta Lake
> processor,
> > > > where
> > > > > you can define to which Databricks cluster the data should go and
> > which
> > > > > Delta Lake operation (upsert, merge, delete, ...) should happen
> with
> > the
> > > > > data. That means Databricks is only the executing component and I
> > don't
> > > > > have to code in Databricks in notebooks anymore. I also find the
> > > > > possibility to request an extra cluster with the processor cool.
> > > > >
> > > > > Being able to to the same with Athena and Presto would be a dream!
> > > >
> >
>

Re: Processor for Delta Lake / Real Time Data Warehouse

Posted by Martin Ebert <ma...@gmx.de>.
Can anyone share a concrete minimal example with Nifi?

Scenario:
I have two tables in parquet format in S3. This is my source.
Step #1 Bronze Layer / Raw
Now I would like to consume these tables in step 1 (ListS3 + FetchS3) and
save each as a delta table in S3 (new delta processor). I use for example
overwrite for each execution. I expect that something like partitions and
the transaction logs are created correctly.
Step 2 Silver Layer / Prep
Now I read the two delta tables (new delta processor) from the previous
step as batch or stream, filter my data (configurable in the delta
processor) and save the new delta tables in my Silver Layer.
Step 3 Gold Layer / Analyze
Now I read the two delta tables from the previous step as batch or stream
and "connect" my data (join, merge, delete, ...). The results will end up
as delta in S3 or as single parquet file (depends on the next steps).
This is how I see the current process.

There are two things that are not quite clear to me:
1) How exactly this looks like: "Delta tables can be registered in the
metastore and you can enable Delta SQL commands with Spark".
2) How do I run the SQL scripts on Databricks? Simply by using the JDBC
connection to Databricks?

Joey Frazee <jo...@icloud.com.invalid> schrieb am So., 2. Feb. 2020,
02:47:

> Delta tables can be registered in the metastore and you can enable Delta
> SQL commands with Spark.
>
> And there are integrations with other engines for the read path.
>
> This isn’t to say a native integration isn’t needed; it is. But there are
> some options today.
>
> -joey
> On Feb 1, 2020, 7:13 PM -0600, Mike Thomsen <mi...@gmail.com>,
> wrote:
> > > But the specific use case you described can be done today using Nifi’s
> > out of the box SQL processors and JDBC/ODBC with the all of mentioned
> > databases/engines.
> >
> > I'm not aware of any direct JDBC route that would allow NiFi to use the
> SQL
> > processors to hit Delta Lake. Would like to be proved wrong here (heck,
> it
> > would make some of our use cases easier!) but AFAIK you have to have NiFi
> > build Parquet and push to S3 or HDFS so Spark can do the transformation.
> >
> > On Sat, Feb 1, 2020 at 4:14 PM Joey Frazee <joey.frazee@icloud.com
> .invalid>
> > wrote:
> >
> > > Martin, I’ve been thinking about this one for a while but I think it
> needs
> > > to be considered in the context of transactional table formats in
> general;
> > > i.e., the incubating Apache Hudi and Apache Iceberg too.
> > >
> > > There are things that are inconvenient for NiFi to do with these table
> > > formats.
> > >
> > > But the specific use case you described can be done today using Nifi’s
> out
> > > of the box SQL processors and JDBC/ODBC with the all of mentioned
> > > databases/engines.
> > >
> > > -joey
> > > On Feb 1, 2020, 6:57 AM -0600, Martin Ebert <ma...@gmx.de>,
> wrote:
> > > > Hi community,
> > > > how can we drive this topic forward? Jira is created:
> > > > https://issues.apache.org/jira/projects/NIFI/issues/NIFI-6976
> > > >
> > > > "A table in Delta Lake is both a batch table, as well as a streaming
> > > source
> > > > and sink. Streaming data ingest, batch historic backfill, and
> interactive
> > > > queries all just work out of the box." (Delta.io)
> > > >
> > > > This is the decisive argument for me. A very impressive technological
> > > > milestone that is just crying out to be implemented in Nifi. You
> find all
> > > > details in the video here: https://youtu.be/VLd_qOrKrTI
> > > >
> > > > Delta Lake is related to Databricks, Athena and Presto. In our case
> it
> > > > would be great to extract data from a database or any other source
> (can
> > > be
> > > > streaming) and send this data or stream to our Databricks cluster.
> > > >
> > > > I imagine it just like in the video. You have a Delta Lake processor,
> > > where
> > > > you can define to which Databricks cluster the data should go and
> which
> > > > Delta Lake operation (upsert, merge, delete, ...) should happen with
> the
> > > > data. That means Databricks is only the executing component and I
> don't
> > > > have to code in Databricks in notebooks anymore. I also find the
> > > > possibility to request an extra cluster with the processor cool.
> > > >
> > > > Being able to to the same with Athena and Presto would be a dream!
> > >
>

Re: Processor for Delta Lake / Real Time Data Warehouse

Posted by Joey Frazee <jo...@icloud.com.INVALID>.
Delta tables can be registered in the metastore and you can enable Delta SQL commands with Spark.

And there are integrations with other engines for the read path.

This isn’t to say a native integration isn’t needed; it is. But there are some options today.

-joey
On Feb 1, 2020, 7:13 PM -0600, Mike Thomsen <mi...@gmail.com>, wrote:
> > But the specific use case you described can be done today using Nifi’s
> out of the box SQL processors and JDBC/ODBC with the all of mentioned
> databases/engines.
>
> I'm not aware of any direct JDBC route that would allow NiFi to use the SQL
> processors to hit Delta Lake. Would like to be proved wrong here (heck, it
> would make some of our use cases easier!) but AFAIK you have to have NiFi
> build Parquet and push to S3 or HDFS so Spark can do the transformation.
>
> On Sat, Feb 1, 2020 at 4:14 PM Joey Frazee <jo...@icloud.com.invalid>
> wrote:
>
> > Martin, I’ve been thinking about this one for a while but I think it needs
> > to be considered in the context of transactional table formats in general;
> > i.e., the incubating Apache Hudi and Apache Iceberg too.
> >
> > There are things that are inconvenient for NiFi to do with these table
> > formats.
> >
> > But the specific use case you described can be done today using Nifi’s out
> > of the box SQL processors and JDBC/ODBC with the all of mentioned
> > databases/engines.
> >
> > -joey
> > On Feb 1, 2020, 6:57 AM -0600, Martin Ebert <ma...@gmx.de>, wrote:
> > > Hi community,
> > > how can we drive this topic forward? Jira is created:
> > > https://issues.apache.org/jira/projects/NIFI/issues/NIFI-6976
> > >
> > > "A table in Delta Lake is both a batch table, as well as a streaming
> > source
> > > and sink. Streaming data ingest, batch historic backfill, and interactive
> > > queries all just work out of the box." (Delta.io)
> > >
> > > This is the decisive argument for me. A very impressive technological
> > > milestone that is just crying out to be implemented in Nifi. You find all
> > > details in the video here: https://youtu.be/VLd_qOrKrTI
> > >
> > > Delta Lake is related to Databricks, Athena and Presto. In our case it
> > > would be great to extract data from a database or any other source (can
> > be
> > > streaming) and send this data or stream to our Databricks cluster.
> > >
> > > I imagine it just like in the video. You have a Delta Lake processor,
> > where
> > > you can define to which Databricks cluster the data should go and which
> > > Delta Lake operation (upsert, merge, delete, ...) should happen with the
> > > data. That means Databricks is only the executing component and I don't
> > > have to code in Databricks in notebooks anymore. I also find the
> > > possibility to request an extra cluster with the processor cool.
> > >
> > > Being able to to the same with Athena and Presto would be a dream!
> >

Re: Processor for Delta Lake / Real Time Data Warehouse

Posted by Mike Thomsen <mi...@gmail.com>.
> But the specific use case you described can be done today using Nifi’s
out of the box SQL processors and JDBC/ODBC with the all of mentioned
databases/engines.

I'm not aware of any direct JDBC route that would allow NiFi to use the SQL
processors to hit Delta Lake. Would like to be proved wrong here (heck, it
would make some of our use cases easier!) but AFAIK you have to have NiFi
build Parquet and push to S3 or HDFS so Spark can do the transformation.

On Sat, Feb 1, 2020 at 4:14 PM Joey Frazee <jo...@icloud.com.invalid>
wrote:

> Martin, I’ve been thinking about this one for a while but I think it needs
> to be considered in the context of transactional table formats in general;
> i.e., the incubating Apache Hudi and Apache Iceberg too.
>
> There are things that are inconvenient for NiFi to do with these table
> formats.
>
> But the specific use case you described can be done today using Nifi’s out
> of the box SQL processors and JDBC/ODBC with the all of mentioned
> databases/engines.
>
> -joey
> On Feb 1, 2020, 6:57 AM -0600, Martin Ebert <ma...@gmx.de>, wrote:
> > Hi community,
> > how can we drive this topic forward? Jira is created:
> > https://issues.apache.org/jira/projects/NIFI/issues/NIFI-6976
> >
> > "A table in Delta Lake is both a batch table, as well as a streaming
> source
> > and sink. Streaming data ingest, batch historic backfill, and interactive
> > queries all just work out of the box." (Delta.io)
> >
> > This is the decisive argument for me. A very impressive technological
> > milestone that is just crying out to be implemented in Nifi. You find all
> > details in the video here: https://youtu.be/VLd_qOrKrTI
> >
> > Delta Lake is related to Databricks, Athena and Presto. In our case it
> > would be great to extract data from a database or any other source (can
> be
> > streaming) and send this data or stream to our Databricks cluster.
> >
> > I imagine it just like in the video. You have a Delta Lake processor,
> where
> > you can define to which Databricks cluster the data should go and which
> > Delta Lake operation (upsert, merge, delete, ...) should happen with the
> > data. That means Databricks is only the executing component and I don't
> > have to code in Databricks in notebooks anymore. I also find the
> > possibility to request an extra cluster with the processor cool.
> >
> > Being able to to the same with Athena and Presto would be a dream!
>

Re: Processor for Delta Lake / Real Time Data Warehouse

Posted by Joey Frazee <jo...@icloud.com.INVALID>.
Martin, I’ve been thinking about this one for a while but I think it needs to be considered in the context of transactional table formats in general; i.e., the incubating Apache Hudi and Apache Iceberg too.

There are things that are inconvenient for NiFi to do with these table formats.

But the specific use case you described can be done today using Nifi’s out of the box SQL processors and JDBC/ODBC with the all of mentioned databases/engines.

-joey
On Feb 1, 2020, 6:57 AM -0600, Martin Ebert <ma...@gmx.de>, wrote:
> Hi community,
> how can we drive this topic forward? Jira is created:
> https://issues.apache.org/jira/projects/NIFI/issues/NIFI-6976
>
> "A table in Delta Lake is both a batch table, as well as a streaming source
> and sink. Streaming data ingest, batch historic backfill, and interactive
> queries all just work out of the box." (Delta.io)
>
> This is the decisive argument for me. A very impressive technological
> milestone that is just crying out to be implemented in Nifi. You find all
> details in the video here: https://youtu.be/VLd_qOrKrTI
>
> Delta Lake is related to Databricks, Athena and Presto. In our case it
> would be great to extract data from a database or any other source (can be
> streaming) and send this data or stream to our Databricks cluster.
>
> I imagine it just like in the video. You have a Delta Lake processor, where
> you can define to which Databricks cluster the data should go and which
> Delta Lake operation (upsert, merge, delete, ...) should happen with the
> data. That means Databricks is only the executing component and I don't
> have to code in Databricks in notebooks anymore. I also find the
> possibility to request an extra cluster with the processor cool.
>
> Being able to to the same with Athena and Presto would be a dream!