You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@kudu.apache.org by Ehud Eshet <Eh...@imperva.com> on 2017/07/31 13:13:22 UTC

Database audit analytics - Kudu vs. Parquet

Hi all,
High level use case description
Database Activity Monitoring captures audit records from various relational and NoSQL databases.
Big customers may have thousands of different monitored databases that may generate (together) billions to trillion audit records per day.
Audit records are loaded once, queried many times, and never being updated.
Customers may define one audit policy to audit all DB activity or multiple audit policies on which each will audit a sub-set of DB activity.
Audit records are highly structured. Each record contains:

*        Event time stamp and sequence id.

*        Database identifiers such as: Server IP, Listener port, Instance Name, DB Name, DBMS type, DB account.

*        Client identifiers such as: Client IP, Client port, OS User, Host, Application, Connection ID.

*        Operation identifiers such as: Audit policy, Operation, Query text, Bind variables values, Object schema, Object name, Object type, Error code.

*        Counters such as: Response time, Response size, Affected rows.
The actual structure contains more than 50 columns. Most of them are string identifiers (dimensions).
Audit analytics using Spark SQL with Parquet
Parquet columnar encoding and compression provides a compression ratio of 1 to 100 on my typical data.
Encoding is selected automatically per column by the parquet loader based on the actual data loaded into the current 128MB rows set.
Multiple encodings are supported for same column. E.g. Dictionary and then RLE which are best together when a column has both small amount of long strings and repeating values.
Spark Parquet integration allows each Spark worker node to read Parquet file blocks that are stored on the local HDFS node.
No range partitions nor hash partitions are supported.
Multi-level list partitions are supported. Adding a partition is just a matter of creating additional HDFS sub-directory with the appropriate naming conventions.
The main pain point with Parquet files is that you cannot update a file after closing it.
To append records, you can only add a new Parquet file on the same HDFS sub-directory.
As long as Parquet file is still open, it is not available for reporting.
Parquet files are not necessarily sorted. Even when they are, Spark SQL will not take advantage on sort order while filtering by sorted leading columns.
Audit analytics using Spark SQL and Kudu
Kudu solve the main pain point of static Parquet files.
However, it introduces other problems:

1.      No support for combination of RLE and dictionary encoding on same String columns (actually RLE is not supported even alone for string columns).
Same typical data suffer from lower compression ratio (1 to 14).

2.      Incomplete Spark SQL integration

a.      Tables can be created via Impala but not via Spark SQL.

b.      Spark SQL can append rows into Kudu table.
However, I did not find documentation for how to tell Spark SQL to add range partitions when needed.

c.      Predicates are pushed by Spark SQL down to Kudu.
However, aggregations are not. All qualified rows are returned to Spark SQL instead of each tablet will return a partially aggregated (and much smaller) rows set.

3.      Primary key is mandatory even when rows will never be updated or deleted (historical data is purged by dropping entire range partition).
Apache Kudu wish list

*        List partitions - all audit reports must filter on audit policy (equal or IN list predicate).
Currently, I can use hash partitioning on policy.
However, number of buckets cannot be changed after table creation.
I would like any new policy to automatically create a new partition.

*        Interval based range partitions - let me just define the interval in micro seconds (86,400,000,000 for daily partitions).
Then Kudu could add partitions automatically when rows for a new day arrive.

*        Automatic compression of dictionary pages when string length > 100

*        Allow RLE encoding of the indexes in the dictionary of string columns.

*        Allow delta encoding for sequences and timestamp columns.

*        Aggregation push down.


[https://signature.imperva.com/assets/imperva-logo.png]
Ehud Eshet | Senior Researcher
Ehud.Eshet@imperva.com<ma...@imperva.com> | o: +972 3-684-0114 | m: +972 52-446-1979


Re: Database audit analytics - Kudu vs. Parquet

Posted by Todd Lipcon <to...@cloudera.com>.
Hey Ehud,

Thanks for the writeup. I left a couple comments inline below:

On Mon, Jul 31, 2017 at 6:13 AM, Ehud Eshet <Eh...@imperva.com> wrote:

> Hi all,
>
> *High level use case description*
>
> Database Activity Monitoring captures audit records from various
> relational and NoSQL databases.
>
> Big customers may have thousands of different monitored databases that may
> generate (together) billions to trillion audit records per day.
> Audit records are loaded once, queried many times, and never being updated.
>
> Customers may define one audit policy to audit all DB activity or multiple
> audit policies on which each will audit a sub-set of DB activity.
>
> Audit records are highly structured. Each record contains:
>
> ·        Event time stamp and sequence id.
>
> ·        Database identifiers such as: Server IP, Listener port, Instance
> Name, DB Name, DBMS type, DB account.
>
> ·        Client identifiers such as: Client IP, Client port, OS User,
> Host, Application, Connection ID.
>
> ·        Operation identifiers such as: Audit policy, Operation, Query
> text, Bind variables values, Object schema, Object name, Object type, Error
> code.
>
> ·        Counters such as: Response time, Response size, Affected rows.
>
> The actual structure contains more than 50 columns. Most of them are
> string identifiers (dimensions).
>
> *Audit analytics using Spark SQL with Parquet*
>
> Parquet columnar encoding and compression provides a compression ratio of
> 1 to 100 on my typical data.
> Encoding is selected automatically per column by the parquet loader based
> on the actual data loaded into the current 128MB rows set.
> Multiple encodings are supported for same column. E.g. Dictionary and then
> RLE which are best together when a column has both small amount of long
> strings and repeating values.
>
> Spark Parquet integration allows each Spark worker node to read Parquet
> file blocks that are stored on the local HDFS node.
>
> No range partitions nor hash partitions are supported.
> Multi-level list partitions are supported. Adding a partition is just a
> matter of creating additional HDFS sub-directory with the appropriate
> naming conventions.
>
> The main pain point with Parquet files is that you cannot update a file
> after closing it.
> To append records, you can only add a new Parquet file on the same HDFS
> sub-directory.
> As long as Parquet file is still open, it is not available for reporting.
>
> Parquet files are not necessarily sorted. Even when they are, Spark SQL
> will not take advantage on sort order while filtering by sorted leading
> columns.
>
> *Audit analytics using Spark SQL and Kudu*
>
> Kudu solve the main pain point of static Parquet files.
>
> However, it introduces other problems:
>
> 1.      No support for combination of RLE and dictionary encoding on same
> String columns (actually RLE is not supported even alone for string
> columns).
> Same typical data suffer from lower compression ratio (1 to 14).
>

When choosing DICT encoding for a string (the default), it does actually
compress the dictionary codewords using bitshuffle encoding. This should
provide all of the benefits of bitpacking and some of the benefits of RLE
as well. I think if you have runs longer than 32 entries or so, you'll
start to benefit from this aspect of bitshuffle.


> 2.      Incomplete Spark SQL integration
>
> a.      Tables can be created via Impala but not via Spark SQL.
>
> b.      Spark SQL can append rows into Kudu table.
> However, I did not find documentation for how to tell Spark SQL to add
> range partitions when needed.
>

That's correct -- the above two need to be done using Scala code

> c.      Predicates are pushed by Spark SQL down to Kudu.
> However, aggregations are not. All qualified rows are returned to Spark
> SQL instead of each tablet will return a partially aggregated (and much
> smaller) rows set.
>

True, but the idea is that each spark SQL task (unit of work) performs its
local aggregation, and hopefully should be reading local Kudu data. So, the
non-aggregated data does not need to flow over the network.

Currently, however, locality is broken in the Spark-Kudu integration:
https://issues.apache.org/jira/browse/KUDU-1454
Fixing this should provide a big performance benefit for Spark SQL.


> 3.      Primary key is mandatory even when rows will never be updated or
> deleted (historical data is purged by dropping entire range partition).
>

True. This is a limitation that we'd like to address:
https://issues.apache.org/jira/browse/KUDU-1879

> *Apache Kudu wish list*
>
> ·        List partitions – all audit reports must filter on audit policy
> (equal or IN list predicate).
> Currently, I can use hash partitioning on policy.
> However, number of buckets cannot be changed after table creation.
> I would like any new policy to automatically create a new partition.
>
> ·        Interval based range partitions – let me just define the
> interval in micro seconds (86,400,000,000 for daily partitions).
> Then Kudu could add partitions automatically when rows for a new day
> arrive.
>
> ·        Automatic compression of dictionary pages when string length >
> 100
>
> ·        Allow RLE encoding of the indexes in the dictionary of string
> columns.
>
> ·        Allow delta encoding for sequences and timestamp columns.
>
> ·        Aggregation push down.
>
>
>

Good list of items. I think several are already captured in the JIRA but
none is actively being worked on.

It sounds like you are pretty advanced in your understanding of this
problem domain. Any interest in contributing towards any of the above
features? In particular, help fixing the Spark locality issue would be
great, and I don't think it is too complicated to tackle as a first
contribution.

-Todd
-- 
Todd Lipcon
Software Engineer, Cloudera