You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Rishabh Maurya (Jira)" <ji...@apache.org> on 2022/09/02 13:59:00 UTC

[jira] [Created] (FLINK-29186) Store summarized results in internal nodes of BKD for time series points

Rishabh Maurya created FLINK-29186:
--------------------------------------

             Summary: Store summarized results in internal nodes of BKD for time series points
                 Key: FLINK-29186
                 URL: https://issues.apache.org/jira/browse/FLINK-29186
             Project: Flink
          Issue Type: New Feature
          Components: API / Core
            Reporter: Rishabh Maurya


Time series points have a timestamp, measurement and dimensions associated with them. The common queries are range queries on timestamp, metric aggregation on measurement and grouping on dimensions. Or similar query with histogram on timestamp. 

*Proposal:*

Prototype can be found [here|https://github.com/rishabhmaurya/lucene/pull/1]

1. Introduce a new time series point as a field in lucene - `TSPoint` which can be added as - 

```

Document doc = new Document(); doc.add(new TSIntPoint("tsid1", "cpu", timestamp, measurement));

``` 

`tsid1` is the time series ID of the point. It will be the unit of storage for time series points and for prototype each of them represents a unique field in lucene. 

`timestamp` is the actual point in BKD on which the index is created. 

Full definition here can be found [here.|https://github.com/rishabhmaurya/lucene/blob/0c07f677c4caf45b1499f236adc0217992e46722/lucene/core/src/java/org/apache/lucene/document/TSIntPoint.java]

2. Interface for decomposable aggregate function, which can be defined as part of index configuration - 

Sum function - 

```java

new BKDSummaryWriter.SummaryMergeFunction<Integer>() {

      @Override
      public int getSummarySize() {
        return Integer.BYTES;
      }

      @Override
      public void merge(byte[] a, byte[] b, byte[] c) {
        packBytes(unpackBytes(a) + unpackBytes(b), c);
      }

      @Override
      public Integer unpackBytes(byte[] val) {
        return NumericUtils.sortableBytesToInt(val, 0);
      }

      @Override
      public void packBytes(Integer val, byte[] res) {
        NumericUtils.intToSortableBytes(val, res, 0);
      }
    };

```

 

3. New query per `LeafReader` to perform range queries on TSPoint and retrieve summarized results - 

 

```

LeafReader leafReader; PointValues points = leafReader.getPointValues("tsid1");

TSPointQuery tsPointQuery = new TSPointQuery("tsid1", lowerBoundTimestamp, upperBoundTimestamp);

byte[] res = tsPointQuery.getSummary((BKDWithSummaryReader.BKDSummaryTree) points.getPointTree(), mergeFunction);

```

Instead of BKDReader and BKDWriter, we will be using [BKDSummaryWriter|https://github.com/rishabhmaurya/lucene/blob/0c07f677c4caf45b1499f236adc0217992e46722/lucene/core/src/java/org/apache/lucene/util/bkd/BKDSummaryWriter.java] [BKDSummaryReader|https://github.com/rishabhmaurya/lucene/blob/0c07f677c4caf45b1499f236adc0217992e46722/lucene/core/src/java/org/apache/lucene/util/bkd/BKDWithSummaryReader.java] which supports writing summaries with internal nodes of the tree. 

Changes in IntersectVisitor interface [here.|https://github.com/rishabhmaurya/lucene/blob/0c07f677c4caf45b1499f236adc0217992e46722/lucene/core/src/java/org/apache/lucene/index/PointValues.java#L320-L337]
h4. Comparison with DocValues

Below is the comparison of running unit test for [DocValue|https://github.com/rishabhmaurya/lucene/pull/1/commits/157215cd2c4787787748625e10b58001583c6e6e#diff-87c31ac3b1cd1ef45b15c84d9cf1c5bab03feb94f199256f859f14ed4747abd2R129] approach vs [TSPoint|https://github.com/rishabhmaurya/lucene/pull/1/commits/157215cd2c4787787748625e10b58001583c6e6e#diff-87c31ac3b1cd1ef45b15c84d9cf1c5bab03feb94f199256f859f14ed4747abd2R52] approach -

This test ingests {{10000000}} docs against a given TSID and performs a range query on timestamp 100 times against the same TSID. Merge function used is {{{}sum{}}}.
||DocValues approach||TSPoint approach||
|Indexing took: 42948 ms|Indexing took: 32985|
|Matching docs count:1304624 \| Segments:3 \| DiskAccess: 1304624|Matching docs count:8784032 \| Segments:10 \| DiskAccess: 302|
|Search took: 12382 ms|Search took: 50ms|

This is not apple to apple comparison since number of segments are 3 in DocValues approach whereas its 10 in TSPoint approach.
h4. Limitation of this feature
 * Doc deletion currently not supported. We need to evaluate how important is it and possibly find a way to support it in future.
 * Only [decomposable|https://en.wikipedia.org/wiki/Aggregate_function#Decomposable_aggregate_functions] aggregation functions can be supported. E.g. min, max, sum, avg, count.
 * Range query will only be supported on {{{}timestamp{}}}.

h4. TODOs
 * Implementation for multiple TSIDs. For now we need to create a new field with the name same as TSID for a timeseries.
 * Segment merge for BKD with summaries. Currently, the UTs disables merge and perform search across multiple segments and cumulate the results.
 * Pluggable merge function to merge 2 {{{}TSPoint{}}}. Currently its hardcoded in {{FieldInfo.java}} which isn't the right place to define them.
 * Measurement compression in BKD. I'm thinking of using delta encoding to store measurement values and summaries while packing the summaries associated with nodes of the tree.
 * Persist first and last docID in internal nodes of BKD with summaries in an efficient way. This will be useful to use precomputed summaries and skip over batches of documents when iterating using DocIDSetIterator.
 * Benchmark against real timeseries dataset.
 ** compare against SortedDocValues approach.
 ** compare against other timeseries databases.
 * Evaluate support of deletion of document/timeseries/batch of documents (matching a timestamp range).

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)