You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@nifi.apache.org by Paul Parker <ni...@gmail.com> on 2020/04/02 14:42:55 UTC

Re: How to use delta storage format

@Mike I'd appreciate some feedback.

Paul Parker <ni...@gmail.com> schrieb am Di., 31. März 2020, 17:07:

> Let me share answers from the delta community:
>
> Answer to Q1:
> Structured streaming queries can do commits every minute, even every 20-30
> seconds. This definitely creates small files. But that is okay, because it
> is expected that people will periodically compact the files. The same
> timing should work fine for Nifi and any other streaming engine. It does
> create 1000-ish versions per day, but that is okay.
>
> Answer to Q2:
> That is up to the sink implementation. Both are okay. In fact, it probably
> can be combination of both, as long as we dont commit every second. That
> may not scale well.
>
> Answer to Q3:
> You need a primary node which is responsible for managing the Delta table.
> That note would be responsible for reading the log, parsing it, updating
> it, etc. Unfortunately, we have no good non-spark way to read the log, and
> much less write to the log.
> there is an experimental uber jar that tries to package delta + spark
> together into a single jar ... using which you could read the log. its
> available here - https://github.com/delta-io/connectors/
>
> What it does is it basically runs a local-mode (in-process) spark to read
> the log. This what we are using to build a hive connector, that will allow
> hive to read delta files. Now the goal of that was to only read. your goal
> is to write which is definitely more complicated, because for that you have
> to do much more. Now that uber jar has all the necessary code to do the
> writing. ... which you could use. but there has to be a driver node which
> has to collect all the parquet files written by other nodes and atomically
> commit those parquet files to the Delta log to make them visible to all
> readers.
>
> Does the first orientation help?
>
>
> Mike Thomsen <mi...@gmail.com> schrieb am So., 29. März 2020,
> 20:29:
>
>> It looks like a lot of their connectors rely on external management by
>> Spark. That is true of Hive, and also of Athena/Presto unless I misread the
>> documentation. Read some of the fine print near the bottom of this to get
>> an idea of what I mean:
>>
>> https://github.com/delta-io/connectors
>>
>> Hypothetically, we could build a connector for NiFi, but there are some
>> things about the design of Delta Lake that I am not sure about based on my
>> own research and what not. Roughly, they are the following:
>>
>> 1. What is a good timing strategy for doing commits to Delta Lake?
>> 2. What would trigger a commit in the first place?
>> 3. Is there a good way to trigger commits that would work within the
>> masterless cluster design of clustered NiFi instead of requiring a special
>> "primary node only" processor for executing commits?
>>
>> Based on my experimentation, one of the biggest questions around the
>> first point is do you really want potentially thousands or tens of
>> thousands of time shift events to be created throughout the day? A record
>> processor that reads in a ton of small record sets and injects that into
>> the Delta Lake would create a ton of these checkpoints, and they'd be
>> largely meaningless to people trying to make sense of them for the purpose
>> of going back and forth in time between versions.
>>
>> Do we trigger a commit per record set or set a timer?
>>
>> Most of us on the NiFi dev side have no real experience here. It would be
>> helpful for us to get some ideas to form use cases from the community
>> because there are some big gaps on how we'd even start to shape the
>> requirements.
>>
>> On Sun, Mar 29, 2020 at 1:28 PM Paul Parker <ni...@gmail.com>
>> wrote:
>>
>>> Hi Mike,
>>> your alternate suggestion sounds good. But how does it work if I want to
>>> keep this running continuously? In other words, the delta table should be
>>> continuously updated. Finally, this is one of the biggest advantages of
>>> Delta: you can ingest batch and streaming data into one table.
>>>
>>> I also think about workarounds (Use Athena, Presto or Redshift with
>>> Nifi):
>>> "Here is the list of integrations that enable you to access Delta
>>> tables from external data processing engines.
>>>
>>>    - Presto and Athena to Delta Lake Integration
>>>    <https://docs.delta.io/latest/presto-integration.html>
>>>    - Redshift Spectrum to Delta Lake Integration
>>>    <https://docs.delta.io/latest/redshift-spectrum-integration.html>
>>>    - Snowflake to Delta Lake Integration
>>>    <https://docs.delta.io/latest/snowflake-integration.html>
>>>    - Apache Hive to Delta Lake Integration
>>>    <https://docs.delta.io/latest/hive-integration.html>"
>>>
>>> Source:
>>> https://docs.delta.io/latest/integrations.html
>>>
>>> I am looking forward to further ideas from the community.
>>>
>>> Mike Thomsen <mi...@gmail.com> schrieb am So., 29. März 2020,
>>> 17:23:
>>>
>>>> I think there is a connector for Hive that works with Delta. You could
>>>> try setting up Hive to work with Delta and then using NiFi to feed Hive.
>>>> Alternatively, you can simply convert the results of the JDBC query into a
>>>> Parquet file, push to a desired location and run a Spark job to convert
>>>> from Parquet to Delta (that should be pretty fast because Delta is
>>>> basically a fork of Parquet).
>>>>
>>>> On Fri, Mar 27, 2020 at 2:13 PM Paul Parker <ni...@gmail.com>
>>>> wrote:
>>>>
>>>>> We read data via JDBC from a database and want to save the results as
>>>>> a delta table and then read them again. How can I realize this with Nifi
>>>>> and Hive or Glue Metastore?
>>>>>
>>>>

Re: How to use delta storage format

Posted by Mike Thomsen <mi...@gmail.com>.
Interesting. You've given me a lot to think about in terms of designing
this.

On Thu, Apr 2, 2020 at 10:43 AM Paul Parker <ni...@gmail.com> wrote:

> @Mike I'd appreciate some feedback.
>
> Paul Parker <ni...@gmail.com> schrieb am Di., 31. März 2020, 17:07:
>
>> Let me share answers from the delta community:
>>
>> Answer to Q1:
>> Structured streaming queries can do commits every minute, even every
>> 20-30 seconds. This definitely creates small files. But that is okay,
>> because it is expected that people will periodically compact the files. The
>> same timing should work fine for Nifi and any other streaming engine. It
>> does create 1000-ish versions per day, but that is okay.
>>
>> Answer to Q2:
>> That is up to the sink implementation. Both are okay. In fact, it
>> probably can be combination of both, as long as we dont commit every
>> second. That may not scale well.
>>
>> Answer to Q3:
>> You need a primary node which is responsible for managing the Delta
>> table. That note would be responsible for reading the log, parsing it,
>> updating it, etc. Unfortunately, we have no good non-spark way to read the
>> log, and much less write to the log.
>> there is an experimental uber jar that tries to package delta + spark
>> together into a single jar ... using which you could read the log. its
>> available here - https://github.com/delta-io/connectors/
>>
>> What it does is it basically runs a local-mode (in-process) spark to read
>> the log. This what we are using to build a hive connector, that will allow
>> hive to read delta files. Now the goal of that was to only read. your goal
>> is to write which is definitely more complicated, because for that you have
>> to do much more. Now that uber jar has all the necessary code to do the
>> writing. ... which you could use. but there has to be a driver node which
>> has to collect all the parquet files written by other nodes and atomically
>> commit those parquet files to the Delta log to make them visible to all
>> readers.
>>
>> Does the first orientation help?
>>
>>
>> Mike Thomsen <mi...@gmail.com> schrieb am So., 29. März 2020,
>> 20:29:
>>
>>> It looks like a lot of their connectors rely on external management by
>>> Spark. That is true of Hive, and also of Athena/Presto unless I misread the
>>> documentation. Read some of the fine print near the bottom of this to get
>>> an idea of what I mean:
>>>
>>> https://github.com/delta-io/connectors
>>>
>>> Hypothetically, we could build a connector for NiFi, but there are some
>>> things about the design of Delta Lake that I am not sure about based on my
>>> own research and what not. Roughly, they are the following:
>>>
>>> 1. What is a good timing strategy for doing commits to Delta Lake?
>>> 2. What would trigger a commit in the first place?
>>> 3. Is there a good way to trigger commits that would work within the
>>> masterless cluster design of clustered NiFi instead of requiring a special
>>> "primary node only" processor for executing commits?
>>>
>>> Based on my experimentation, one of the biggest questions around the
>>> first point is do you really want potentially thousands or tens of
>>> thousands of time shift events to be created throughout the day? A record
>>> processor that reads in a ton of small record sets and injects that into
>>> the Delta Lake would create a ton of these checkpoints, and they'd be
>>> largely meaningless to people trying to make sense of them for the purpose
>>> of going back and forth in time between versions.
>>>
>>> Do we trigger a commit per record set or set a timer?
>>>
>>> Most of us on the NiFi dev side have no real experience here. It would
>>> be helpful for us to get some ideas to form use cases from the community
>>> because there are some big gaps on how we'd even start to shape the
>>> requirements.
>>>
>>> On Sun, Mar 29, 2020 at 1:28 PM Paul Parker <ni...@gmail.com>
>>> wrote:
>>>
>>>> Hi Mike,
>>>> your alternate suggestion sounds good. But how does it work if I want
>>>> to keep this running continuously? In other words, the delta table should
>>>> be continuously updated. Finally, this is one of the biggest advantages of
>>>> Delta: you can ingest batch and streaming data into one table.
>>>>
>>>> I also think about workarounds (Use Athena, Presto or Redshift with
>>>> Nifi):
>>>> "Here is the list of integrations that enable you to access Delta
>>>> tables from external data processing engines.
>>>>
>>>>    - Presto and Athena to Delta Lake Integration
>>>>    <https://docs.delta.io/latest/presto-integration.html>
>>>>    - Redshift Spectrum to Delta Lake Integration
>>>>    <https://docs.delta.io/latest/redshift-spectrum-integration.html>
>>>>    - Snowflake to Delta Lake Integration
>>>>    <https://docs.delta.io/latest/snowflake-integration.html>
>>>>    - Apache Hive to Delta Lake Integration
>>>>    <https://docs.delta.io/latest/hive-integration.html>"
>>>>
>>>> Source:
>>>> https://docs.delta.io/latest/integrations.html
>>>>
>>>> I am looking forward to further ideas from the community.
>>>>
>>>> Mike Thomsen <mi...@gmail.com> schrieb am So., 29. März 2020,
>>>> 17:23:
>>>>
>>>>> I think there is a connector for Hive that works with Delta. You could
>>>>> try setting up Hive to work with Delta and then using NiFi to feed Hive.
>>>>> Alternatively, you can simply convert the results of the JDBC query into a
>>>>> Parquet file, push to a desired location and run a Spark job to convert
>>>>> from Parquet to Delta (that should be pretty fast because Delta is
>>>>> basically a fork of Parquet).
>>>>>
>>>>> On Fri, Mar 27, 2020 at 2:13 PM Paul Parker <ni...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> We read data via JDBC from a database and want to save the results as
>>>>>> a delta table and then read them again. How can I realize this with Nifi
>>>>>> and Hive or Glue Metastore?
>>>>>>
>>>>>